Merging with master

This commit is contained in:
Viktor Klang 2011-09-27 18:02:57 +02:00
parent db8a20ea37
commit 24d9a4d143
16 changed files with 187 additions and 253 deletions

View file

@ -12,12 +12,19 @@ import akka.util.duration._
import akka.config.Config.config
import akka.config.Supervision._
object LoggingReceiveSpec {
class TestLogActor extends Actor {
def receive = { case _ }
}
}
class LoggingReceiveSpec
extends WordSpec
with BeforeAndAfterEach
with BeforeAndAfterAll
with MustMatchers
with TestKit {
import LoggingReceiveSpec._
val level = EventHandler.level
@ -52,7 +59,8 @@ class LoggingReceiveSpec
ignoreMsg {
case EventHandler.Debug(_, s: String)
!s.startsWith("received") && s != "started" && s != "stopping" && s != "restarting" &&
s != "restarted" && !s.startsWith("now supervising") && !s.startsWith("stopped supervising")
s != "restarted" && !s.startsWith("now supervising") && !s.startsWith("stopped supervising") &&
!s.startsWith("now monitoring") && !s.startsWith("stopped monitoring")
case EventHandler.Debug(_, _) true
case EventHandler.Error(_: UnhandledMessageException, _, _) false
case _: EventHandler.Error true
@ -131,36 +139,42 @@ class LoggingReceiveSpec
"log LifeCycle changes if requested" in {
within(2 seconds) {
val supervisor = TestActorRef(Props(new Actor {
def receive = { case _ }
}).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 5, 5000)))
val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 5, 5000)))
val f = Actor.getClass.getDeclaredField("debugLifecycle")
f.setAccessible(true)
f.setBoolean(Actor, true)
val actor = TestActorRef(new Actor {
def receive = {
case _
}
})
val actor = TestActorRef[TestLogActor](Props[TestLogActor].withSupervisor(supervisor))
val actor1 = actor.underlyingActor
expectMsg(EventHandler.Debug(actor1, "started"))
supervisor link actor
expectMsgPF() {
case EventHandler.Debug(ref, msg: String)
ref == supervisor.underlyingActor && msg.startsWith("now supervising")
}
expectMsg(EventHandler.Debug(actor1, "started"))
supervisor link actor
expectMsgPF(hint = "now monitoring") {
case EventHandler.Debug(ref, msg: String)
ref == supervisor.underlyingActor && msg.startsWith("now monitoring")
}
supervisor unlink actor
expectMsgPF(hint = "stopped monitoring") {
case EventHandler.Debug(ref, msg: String)
ref == supervisor.underlyingActor && msg.startsWith("stopped monitoring")
}
actor ! Kill
expectMsg(EventHandler.Debug(actor1, "restarting"))
awaitCond(msgAvailable)
val actor2 = actor.underlyingActor
expectMsgPF() {
expectMsgPF(hint = "restarted") {
case EventHandler.Debug(ref, "restarted") if ref eq actor2 true
}
supervisor unlink actor
expectMsgPF() {
case EventHandler.Debug(ref, msg: String)
ref == supervisor.underlyingActor && msg.startsWith("stopped supervising")
}
actor.stop()
expectMsg(EventHandler.Debug(actor2, "stopping"))
supervisor.stop()

View file

@ -224,7 +224,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll {
val countDownLatch = new CountDownLatch(2)
val boss = actorOf(Props(new Actor {
def receive = { case m: MaximumNumberOfRestartsWithinTimeRangeReached maxNoOfRestartsLatch.open }
def receive = { case t: Terminated maxNoOfRestartsLatch.open }
}).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), None, Some(1000))))
val slave = actorOf(Props(new Actor {

View file

@ -51,7 +51,7 @@ class SupervisorHierarchySpec extends JUnitSuite {
val countDownMax = new CountDownLatch(1)
val boss = actorOf(Props(new Actor {
protected def receive = {
case MaximumNumberOfRestartsWithinTimeRangeReached(_, _, _, _) countDownMax.countDown()
case Terminated(_, _) countDownMax.countDown()
}
}).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 1, 5000)))

View file

@ -60,8 +60,8 @@ object SupervisorSpec {
val temp = actorOf(Props[PingPongActor].withSupervisor(self))
override def receive = {
case Die (temp.?(Die, TimeoutMillis)).get
case _: MaximumNumberOfRestartsWithinTimeRangeReached
case Die (temp.?(Die, TimeoutMillis)).get
case _: Terminated
}
}

View file

@ -53,28 +53,22 @@ case class HotSwap(code: ActorRef ⇒ Actor.Receive, discardOld: Boolean = true)
def this(code: akka.japi.Function[ActorRef, Procedure[Any]]) = this(code, true)
}
case class Failed(actor: ActorRef, cause: Throwable, recoverable: Boolean, timesRestarted: Int, restartTimeWindowStartMs: Long) extends AutoReceivedMessage with PossiblyHarmful
case class Failed(@BeanProperty actor: ActorRef,
@BeanProperty cause: Throwable,
@BeanProperty recoverable: Boolean,
@BeanProperty timesRestarted: Int,
@BeanProperty restartTimeWindowStartMs: Long) extends AutoReceivedMessage with PossiblyHarmful
case class ChildTerminated(child: ActorRef, cause: Throwable) extends AutoReceivedMessage with PossiblyHarmful
case object RevertHotSwap extends AutoReceivedMessage with PossiblyHarmful
case class Link(child: ActorRef) extends AutoReceivedMessage with PossiblyHarmful
case class Unlink(child: ActorRef) extends AutoReceivedMessage with PossiblyHarmful
case class UnlinkAndStop(child: ActorRef) extends AutoReceivedMessage with PossiblyHarmful
case object PoisonPill extends AutoReceivedMessage with PossiblyHarmful
case object Kill extends AutoReceivedMessage with PossiblyHarmful
case object ReceiveTimeout extends PossiblyHarmful
case class MaximumNumberOfRestartsWithinTimeRangeReached(
@BeanProperty victim: ActorRef,
@BeanProperty maxNrOfRetries: Option[Int],
@BeanProperty withinTimeRange: Option[Int],
@BeanProperty lastExceptionCausingRestart: Throwable) //FIXME should be removed and replaced with Terminated
case class Terminated(@BeanProperty actor: ActorRef, @BeanProperty cause: Throwable)
// Exceptions for Actors
@ -609,9 +603,7 @@ trait Actor {
case HotSwap(code, discardOld) become(code(self), discardOld)
case RevertHotSwap unbecome()
case f: Failed context.handleFailure(f)
case Link(child) self.link(child)
case Unlink(child) self.unlink(child)
case UnlinkAndStop(child) self.unlink(child); child.stop()
case ct: ChildTerminated context.handleChildTerminated(ct)
case Kill throw new ActorKilledException("Kill")
case PoisonPill
val ch = channel

View file

@ -4,15 +4,23 @@
package akka.actor
import akka.event.EventHandler
import akka.config.Supervision._
import akka.config.Supervision.{
AllForOnePermanentStrategy,
AllForOneTemporaryStrategy,
FaultHandlingStrategy,
OneForOnePermanentStrategy,
OneForOneTemporaryStrategy,
Temporary,
Permanent
}
import akka.dispatch._
import akka.util._
import java.util.{ Collection JCollection }
import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit }
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.immutable.Stack
import akka.event.{ DumbMonitoring, EventHandler }
import java.util.concurrent.{ ConcurrentLinkedQueue, ScheduledFuture, ConcurrentHashMap, TimeUnit }
/**
* The actor context - the view of the actor cell from the actor.
@ -45,7 +53,9 @@ private[akka] trait ActorContext {
def dispatcher: MessageDispatcher
def handleFailure(fail: Failed)
def handleFailure(fail: Failed): Unit
def handleChildTerminated(childtermination: ChildTerminated): Unit
}
private[akka] object ActorCell {
@ -68,16 +78,13 @@ private[akka] class ActorCell(
@volatile
var futureTimeout: Option[ScheduledFuture[AnyRef]] = None
@volatile //Should be a final field
var _supervisor: Option[ActorRef] = None
@volatile //FIXME doesn't need to be volatile
var maxNrOfRetriesCount: Int = 0
@volatile //FIXME doesn't need to be volatile
var restartTimeWindowStartNanos: Long = 0L
lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef]
val _linkedActors = new ConcurrentLinkedQueue[ActorRef]
@volatile //FIXME doesn't need to be volatile
var hotswap: Stack[PartialFunction[Any, Unit]] = _hotswap // TODO: currently settable from outside for compatibility
@ -104,9 +111,17 @@ private[akka] class ActorCell(
@volatile
var mailbox: Mailbox = _
def start() {
if (props.supervisor.isDefined) props.supervisor.get.link(self)
def start(): Unit = {
mailbox = dispatcher.createMailbox(this)
if (props.supervisor.isDefined) {
props.supervisor.get match {
case l: LocalActorRef
l.underlying.dispatcher.systemDispatch(SystemEnvelope(l.underlying, akka.dispatch.Supervise(self), NullChannel))
case other throw new UnsupportedOperationException("Supervision failure: " + other + " cannot be a supervisor, only LocalActorRefs can")
}
}
Actor.registry.register(self)
dispatcher.attach(this)
}
@ -136,40 +151,22 @@ private[akka] class ActorCell(
if (isRunning)
dispatcher.systemDispatch(SystemEnvelope(this, Terminate, NullChannel))
def link(actorRef: ActorRef): ActorRef = {
guard.withGuard {
val actorRefSupervisor = actorRef.supervisor
val hasSupervisorAlready = actorRefSupervisor.isDefined
if (hasSupervisorAlready && actorRefSupervisor.get.uuid == self.uuid) return actorRef // we already supervise this guy
else if (hasSupervisorAlready) throw new IllegalActorStateException(
"Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails")
else {
_linkedActors.put(actorRef.uuid, actorRef)
actorRef.supervisor = Some(self)
}
}
if (Actor.debugLifecycle) EventHandler.debug(actor.get(), "now supervising " + actorRef)
actorRef
def link(subject: ActorRef): ActorRef = {
dispatcher.systemDispatch(SystemEnvelope(this, Link(subject), NullChannel))
subject
}
def unlink(actorRef: ActorRef): ActorRef = {
guard.withGuard {
if (_linkedActors.remove(actorRef.uuid) eq null)
throw new IllegalActorStateException("Actor [" + actorRef + "] is not a linked actor, can't unlink")
actorRef.supervisor = None
if (Actor.debugLifecycle) EventHandler.debug(actor.get(), "stopped supervising " + actorRef)
}
actorRef
def unlink(subject: ActorRef): ActorRef = {
dispatcher.systemDispatch(SystemEnvelope(this, Unlink(subject), NullChannel))
subject
}
def linkedActors: JCollection[ActorRef] = java.util.Collections.unmodifiableCollection(_linkedActors.values)
def linkedActors: JCollection[ActorRef] = java.util.Collections.unmodifiableCollection(_linkedActors)
def supervisor: Option[ActorRef] = _supervisor
//TODO FIXME remove this method
def supervisor: Option[ActorRef] = props.supervisor
def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup
def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit =
dispatcher dispatch Envelope(this, message, channel)
def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = dispatcher dispatch Envelope(this, message, channel)
def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
@ -216,10 +213,11 @@ private[akka] class ActorCell(
}
} catch {
case e
e.printStackTrace(System.err)
envelope.channel.sendException(e)
if (supervisor.isDefined) supervisor.get ! Failed(self, e, false, maxNrOfRetriesCount, restartTimeWindowStartNanos)
else throw e
if (supervisor.isDefined)
DumbMonitoring.signal(Failed(self, e, false, maxNrOfRetriesCount, restartTimeWindowStartNanos), supervisor.get)
else
throw e
}
def suspend(): Unit = dispatcher suspend this
@ -239,20 +237,18 @@ private[akka] class ActorCell(
if (a ne null) a.postStop()
{ //Stop supervised actors
val i = _linkedActors.values.iterator
while (i.hasNext) {
i.next.stop()
i.remove()
var a = _linkedActors.poll()
while (a ne null) {
a.stop()
a = _linkedActors.poll()
}
}
} finally {
try {
if (supervisor.isDefined)
supervisor.get ! Failed(self, new ActorKilledException("Stopped"), false, maxNrOfRetriesCount, restartTimeWindowStartNanos) //Death(self, new ActorKilledException("Stopped"), false)
} catch {
case e: ActorInitializationException
// TODO: remove when ! cannot throw anymore
}
val cause = new ActorKilledException("Stopped") //FIXME make this an object, can be reused everywhere
if (supervisor.isDefined) supervisor ! ChildTerminated(self, cause)
DumbMonitoring.signal(Terminated(self, cause))
currentMessage = null
clearActorContext()
}
@ -262,11 +258,14 @@ private[akka] class ActorCell(
try {
if (!mailbox.isClosed) {
envelope.message match {
case Create create(recreation = false)
case Recreate create(recreation = true)
case Suspend suspend()
case Resume resume()
case Terminate terminate()
case Create create(recreation = false)
case Recreate create(recreation = true)
case Link(subject) akka.event.DumbMonitoring.link(self, subject); if (Actor.debugLifecycle) EventHandler.debug(actor.get(), "now monitoring " + subject)
case Unlink(subject) akka.event.DumbMonitoring.unlink(self, subject); if (Actor.debugLifecycle) EventHandler.debug(actor.get(), "stopped monitoring " + subject)
case Suspend suspend()
case Resume resume()
case Terminate terminate()
case Supervise(child) if (!_linkedActors.contains(child)) { _linkedActors.offer(child); if (Actor.debugLifecycle) EventHandler.debug(actor.get(), "now supervising " + child) }
}
}
} catch {
@ -299,7 +298,10 @@ private[akka] class ActorCell(
channel.sendException(e)
if (supervisor.isDefined) supervisor.get ! Failed(self, e, true, maxNrOfRetriesCount, restartTimeWindowStartNanos) else dispatcher.resume(this)
if (supervisor.isDefined)
DumbMonitoring.signal(Failed(self, e, true, maxNrOfRetriesCount, restartTimeWindowStartNanos), supervisor.get)
else
dispatcher.resume(this)
if (e.isInstanceOf[InterruptedException]) throw e //Re-throw InterruptedExceptions as expected
} finally {
@ -333,10 +335,27 @@ private[akka] class ActorCell(
case OneForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(fail.cause.getClass))
fail.actor.stop()
self ! MaximumNumberOfRestartsWithinTimeRangeReached(fail.actor, None, None, fail.cause) //FIXME this should be removed, you should link to an actor to get Terminated messages
case _
if (_supervisor.isDefined) throw fail.cause else fail.actor.stop() //Escalate problem if not handled here
if (supervisor.isDefined) throw fail.cause else fail.actor.stop() //Escalate problem if not handled here
}
}
def handleChildTerminated(ct: ChildTerminated): Unit = {
props.faultHandler match {
case AllForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(ct.cause.getClass))
//STOP ALL AND ESCALATE
case AllForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(ct.cause.getClass))
//STOP ALL?
case OneForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(ct.cause.getClass))
//ESCALATE?
case OneForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(ct.cause.getClass))
_linkedActors.remove(ct.child)
case _ throw ct.cause //Escalate problem if not handled here
}
}
@ -379,9 +398,6 @@ private[akka] class ActorCell(
success
}
} else {
// tooManyRestarts
if (supervisor.isDefined)
supervisor.get ! MaximumNumberOfRestartsWithinTimeRangeReached(self, maxNrOfRetries, withinTimeRange, reason)
stop()
true // done
}
@ -429,21 +445,16 @@ private[akka] class ActorCell(
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
props.faultHandler.lifeCycle match {
case Temporary
val i = _linkedActors.values.iterator
while (i.hasNext) {
val actorRef = i.next()
i.remove()
actorRef.stop()
//FIXME if last temporary actor is gone, then unlink me from supervisor <-- should this exist?
if (!i.hasNext && supervisor.isDefined)
supervisor.get ! UnlinkAndStop(self)
{ //Stop supervised actors
var a = _linkedActors.poll()
while (a ne null) {
a.stop()
a = _linkedActors.poll()
}
}
case Permanent
val i = _linkedActors.values.iterator
val i = _linkedActors.iterator
while (i.hasNext) i.next().restart(reason, maxNrOfRetries, withinTimeRange)
}
}

View file

@ -161,8 +161,6 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha
timeout: Timeout,
channel: UntypedChannel): Future[Any]
protected[akka] def supervisor_=(sup: Option[ActorRef])
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int])
override def hashCode: Int = HashCode.hash(HashCode.SEED, address)
@ -249,7 +247,7 @@ class LocalActorRef private[akka] (
* To be invoked from within the actor itself.
* Returns the ref that was passed into it
*/
def link(actorRef: ActorRef): ActorRef = actorCell.link(actorRef)
def link(subject: ActorRef): ActorRef = actorCell.link(subject)
/**
* Unlink the actor.
@ -257,7 +255,7 @@ class LocalActorRef private[akka] (
* To be invoked from within the actor itself.
* Returns the ref that was passed into it
*/
def unlink(actorRef: ActorRef): ActorRef = actorCell.unlink(actorRef)
def unlink(subject: ActorRef): ActorRef = actorCell.unlink(subject)
/**
* Returns the supervisor, if there is one.
@ -283,9 +281,6 @@ class LocalActorRef private[akka] (
protected[akka] override def timeout: Long = props.timeout.duration.toMillis // TODO: remove this if possible
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit =
actorCell.supervisor = sup
protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit =
actorCell.postMessageToMailbox(message, channel)
@ -391,10 +386,6 @@ private[akka] case class RemoteActorRef private[akka] (
unsupported
}
protected[akka] def supervisor_=(sup: Option[ActorRef]) {
unsupported
}
private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
}
@ -487,10 +478,6 @@ trait UnsupportedActorRef extends ActorRef with ScalaActorRef {
def supervisor: Option[ActorRef] = unsupported
protected[akka] def supervisor_=(sup: Option[ActorRef]) {
unsupported
}
def suspend(): Unit = unsupported
def resume(): Unit = unsupported

View file

@ -106,7 +106,7 @@ case class SupervisorFactory(val config: SupervisorConfig) {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed class Supervisor(handler: FaultHandlingStrategy, maxRestartsHandler: (ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached) Unit) {
sealed class Supervisor(handler: FaultHandlingStrategy, maxRestartsHandler: (ActorRef, Terminated) Unit) {
import Supervisor._
private val _childActors = new CopyOnWriteArrayList[ActorRef]
@ -164,7 +164,7 @@ sealed class Supervisor(handler: FaultHandlingStrategy, maxRestartsHandler: (Act
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
final class SupervisorActor private[akka] (maxRestartsHandler: (ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached) Unit) extends Actor {
final class SupervisorActor private[akka] (maxRestartsHandler: (ActorRef, Terminated) Unit) extends Actor {
override def postStop() {
val i = linkedActors.iterator
@ -176,7 +176,7 @@ final class SupervisorActor private[akka] (maxRestartsHandler: (ActorRef, Maximu
}
def receive = {
case max @ MaximumNumberOfRestartsWithinTimeRangeReached(_, _, _, _) maxRestartsHandler(self, max)
case termination: Terminated maxRestartsHandler(self, termination)
case unknown throw new SupervisorException(
"SupervisorActor can not respond to messages.\n\tUnknown message [" + unknown + "]")
}

View file

@ -5,7 +5,7 @@
package akka.config
import akka.dispatch.MessageDispatcher
import akka.actor.{ MaximumNumberOfRestartsWithinTimeRangeReached, ActorRef }
import akka.actor.{ Terminated, ActorRef }
import akka.japi.{ Procedure2 }
case class RemoteAddress(val hostname: String, val port: Int)
@ -22,10 +22,10 @@ object Supervision {
sealed abstract class LifeCycle extends ConfigElement
sealed abstract class FaultHandlingStrategy(val trapExit: List[Class[_ <: Throwable]], val lifeCycle: LifeCycle) extends ConfigElement
case class SupervisorConfig(restartStrategy: FaultHandlingStrategy, worker: List[Server], maxRestartsHandler: (ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached) Unit = { (aRef, max) () }) extends Server {
case class SupervisorConfig(restartStrategy: FaultHandlingStrategy, worker: List[Server], maxRestartsHandler: (ActorRef, Terminated) Unit = { (aRef, max) () }) extends Server {
//Java API
def this(restartStrategy: FaultHandlingStrategy, worker: Array[Server]) = this(restartStrategy, worker.toList)
def this(restartStrategy: FaultHandlingStrategy, worker: Array[Server], restartHandler: Procedure2[ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached]) = this(restartStrategy, worker.toList, { (aRef, max) restartHandler.apply(aRef, max) })
def this(restartStrategy: FaultHandlingStrategy, worker: Array[Server], restartHandler: Procedure2[ActorRef, Terminated]) = this(restartStrategy, worker.toList, { (aRef, max) restartHandler.apply(aRef, max) })
}
class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, val registerAsRemoteService: Boolean = false) extends Server {

View file

@ -24,12 +24,15 @@ final case class Envelope(val receiver: ActorCell, val message: Any, val channel
}
}
sealed trait SystemMessage
sealed trait SystemMessage extends PossiblyHarmful
case object Create extends SystemMessage
case object Recreate extends SystemMessage
case object Suspend extends SystemMessage
case object Resume extends SystemMessage
case object Terminate extends SystemMessage
case class Supervise(child: ActorRef) extends SystemMessage
case class Link(subject: ActorRef) extends SystemMessage
case class Unlink(subject: ActorRef) extends SystemMessage
final case class SystemEnvelope(val receiver: ActorCell, val message: SystemMessage, val channel: UntypedChannel) {
if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null")

View file

@ -2,99 +2,53 @@
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
/*package akka.event
package akka.event
import akka.actor.{ Death, ActorRef }
import akka.config.Supervision.{ FaultHandlingStrategy }
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.locks.ReentrantReadWriteLock
import akka.actor._
import akka.dispatch.SystemEnvelope
trait DeathWatch {
def signal(death: Death): Unit
def signal(fail: Failed, supervisor: ActorRef): Unit
def signal(terminated: Terminated): Unit
}
object Supervision {
case class ActiveEntry(monitoring: Vector[ActorRef] = Vector(), supervising: Vector[ActorRef] = Vector(), strategy: FaultHandlingStrategy)
case class PassiveEntry(monitors: Vector[ActorRef] = Vector(), supervisor: Option[ActorRef] = None)
trait Monitoring {
def link(monitor: ActorRef, monitored: ActorRef): Unit
def unlink(monitor: ActorRef, monitored: ActorRef): Unit
}
trait Supervision { self: DeathWatch =>
object DumbMonitoring extends DeathWatch with Monitoring {
import Supervision._
val monitoring = new akka.util.Index[ActorRef, ActorRef] //Key == monitored, Values == monitors
val guard = new ReentrantReadWriteLock
val read = guard.readLock()
val write = guard.writeLock()
def signal(fail: Failed, supervisor: ActorRef): Unit =
supervisor match {
case l: LocalActorRef l ! fail //FIXME, should Failed be a system message ? => l.underlying.dispatcher.systemDispatch(SystemEnvelope(l.underlying, fail, NullChannel))
case other throw new IllegalStateException("Supervision only works for local actors currently")
}
val activeEntries = new ConcurrentHashMap[ActorRef, ActiveEntry](1024)
val passiveEntries = new ConcurrentHashMap[ActorRef, PassiveEntry](1024)
def signal(terminated: Terminated): Unit = {
val monitors = monitoring.remove(terminated.actor)
if (monitors.isDefined)
monitors.get.foreach(_ ! terminated)
}
def registerMonitorable(monitor: ActorRef, monitorsSupervisor: Option[ActorRef], faultHandlingStrategy: FaultHandlingStrategy) {
read.lock()
try {
activeEntries.putIfAbsent(monitor, ActiveEntry(strategy = faultHandlingStrategy))
passiveEntries.putIfAbsent(monitor, PassiveEntry(supervisor = monitorsSupervisor))
} finally {
read.unlock()
def link(monitor: ActorRef, monitored: ActorRef): Unit = {
if (monitored.isShutdown) monitor ! Terminated(monitored, new ActorKilledException("Already terminated when linking"))
else { // FIXME race between shutting down
monitoring.put(monitored, monitor)
}
}
def deregisterMonitorable(monitor: ActorRef) {
read.lock()
try {
activeEntries.remove(monitor)
passiveEntries.remove(monitor)
} finally {
read.unlock()
}
}
def startMonitoring(monitor: ActorRef, monitored: ActorRef): ActorRef = {
def addActiveEntry(): ActorRef =
activeEntries.get(monitor) match {
case null => null//He's stopped or not started, which is unlikely
case entry =>
val updated = entry.copy(monitoring = entry.monitoring :+ monitored)
if (activeEntries.replace(monitor, entry, updated))
monitored
else
addActiveEntry()
}
def addPassiveEntry(): ActorRef =
activeEntries.get(monitored) match {
case null => null//The thing we're trying to monitor isn't registered, abort
case _ =>
passiveEntries.get(monitored) match {
case null =>
passiveEntries.putIfAbsent(monitored, PassiveEntry(monitors = Vector(monitor))) match {
case null => monitored//All good
case _ => addPassiveEntry()
}
case existing =>
val updated = existing.copy(monitors = existing.monitors :+ monitor)
if (passiveEntries.replace(monitored, existing, updated))
monitored
else
addPassiveEntry()
}
}
read.lock()
try {
addActiveEntry()
addPassiveEntry()
} finally {
read.unlock()
}
}
def stopMonitoring(monitor: ActorRef, monitored: ActorRef, strategy: FaultHandlingStrategy, supervise: Boolean): ActorRef = {
monitored
def unlink(monitor: ActorRef, monitored: ActorRef): Unit = {
monitoring.remove(monitored, monitor)
}
}
*/
/*
* Scenarios that can occur:

View file

@ -100,8 +100,6 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒
// for testing...
case Stat
tryReply(Stats(_delegates length))
case MaximumNumberOfRestartsWithinTimeRangeReached(victim, _, _, _)
_delegates = _delegates filterNot { _.uuid == victim.uuid }
case Terminated(victim, _)
_delegates = _delegates filterNot { _.uuid == victim.uuid }
case msg

View file

@ -112,6 +112,17 @@ class Index[K <: AnyRef, V <: AnyRef: Manifest] {
} else false //Remove failed
}
def remove(key: K): Option[Iterable[V]] = {
val set = container get key
if (set ne null) {
set.synchronized {
container.remove(key, set)
Some(scala.collection.JavaConverters.collectionAsScalaIterableConverter(set).asScala)
}
} else None //Remove failed
}
/**
* @return true if the underlying containers is empty, may report false negatives when the last remove is underway
*/

View file

@ -36,24 +36,7 @@ class TestActorRef[T <: Actor](props: Props, address: String) extends LocalActor
override def toString = "TestActor[" + address + ":" + uuid + "]"
override def equals(other: Any) =
other.isInstanceOf[TestActorRef[_]] &&
other.asInstanceOf[TestActorRef[_]].uuid == uuid
/**
* Override to check whether the new supervisor is running on the CallingThreadDispatcher,
* as it should be. This can of course be tricked by linking before setting the dispatcher before starting the
* supervisor, but then you just asked for trouble.
*/
override def supervisor_=(a: Option[ActorRef]) {
a match { //TODO This should probably be removed since the Supervisor could be a remote actor for all we know
case Some(l: LocalActorRef) if !l.underlying.dispatcher.isInstanceOf[CallingThreadDispatcher]
EventHandler.warning(this, "supervisor " + l + " does not use CallingThreadDispatcher")
case _
}
super.supervisor_=(a)
}
override def equals(other: Any) = other.isInstanceOf[TestActorRef[_]] && other.asInstanceOf[TestActorRef[_]].uuid == uuid
}
object TestActorRef {

View file

@ -42,11 +42,11 @@ class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor with FSM[
case Ev(SetIgnore(ign)) stay using ign
case Ev(StateTimeout)
stop
case Event(x: AnyRef, ign)
val ignore = ign map (z if (z isDefinedAt x) z(x) else false) getOrElse false
if (!ignore) {
case Event(x: AnyRef, data)
val observe = data map (ignoreFunc if (ignoreFunc isDefinedAt x) !ignoreFunc(x) else true) getOrElse true
if (observe)
queue.offerLast(RealMessage(x, channel))
}
stay
}
initialize
@ -269,10 +269,10 @@ trait TestKitLight {
*
* @return the received object as transformed by the partial function
*/
def expectMsgPF[T](max: Duration = Duration.MinusInf)(f: PartialFunction[Any, T]): T = {
def expectMsgPF[T](max: Duration = Duration.MinusInf, hint: String = "")(f: PartialFunction[Any, T]): T = {
val _max = if (max eq Duration.MinusInf) remaining else max.dilated
val o = receiveOne(_max)
assert(o ne null, "timeout during expectMsg")
assert(o ne null, "timeout during expectMsg: " + hint)
assert(f.isDefinedAt(o), "does not match: " + o)
f(o)
}

View file

@ -224,25 +224,6 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac
a.underlying.dispatcher.getClass must be(classOf[CallingThreadDispatcher])
}
"warn about scheduled supervisor" in {
val boss = Actor.actorOf(new Actor { def receive = { case _ } })
val ref = TestActorRef[WorkerActor]
val filter = EventFilter.custom(_ true)
EventHandler.notify(TestEvent.Mute(filter))
val log = TestActorRef[Logger]
EventHandler.addListener(log)
val eventHandlerLevel = EventHandler.level
EventHandler.level = EventHandler.WarningLevel
boss link ref
val la = log.underlyingActor
la.count must be(1)
la.msg must (include("supervisor") and include("CallingThreadDispatcher"))
EventHandler.level = eventHandlerLevel
EventHandler.removeListener(log)
EventHandler.notify(TestEvent.UnMute(filter))
}
"proxy apply for the underlying actor" in {
val ref = TestActorRef[WorkerActor]
intercept[IllegalActorStateException] { ref("work") }