Merging with master

This commit is contained in:
Viktor Klang 2012-08-09 17:08:17 +02:00
commit 7beff1abe7
24 changed files with 658 additions and 256 deletions

View file

@ -147,7 +147,7 @@ object FSMTimingSpec {
}
def resume(actorRef: ActorRef): Unit = actorRef match {
case l: ActorRefWithCell l.resume(inResponseToFailure = false)
case l: ActorRefWithCell l.resume(causedByFailure = null)
case _
}

View file

@ -5,25 +5,24 @@
package akka.actor
import language.postfixOps
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import scala.concurrent.Await
import scala.concurrent.util.Duration
import scala.concurrent.util.duration.intToDurationInt
import scala.math.BigInt.int2bigInt
import scala.util.Random
import scala.util.control.NoStackTrace
import com.typesafe.config.{ ConfigFactory, Config }
import SupervisorStrategy.{ Resume, Restart, Directive }
import SupervisorStrategy.{ Resume, Restart, Stop, Directive }
import akka.actor.SupervisorStrategy.seqThrowable2Decider
import akka.dispatch.{ MessageDispatcher, DispatcherPrerequisites, DispatcherConfigurator, Dispatcher }
import akka.pattern.ask
import akka.testkit.{ ImplicitSender, EventFilter, DefaultTimeout, AkkaSpec }
import akka.testkit.{ filterException, duration2TestDuration, TestLatch }
import akka.testkit.TestEvent.Mute
import java.util.concurrent.ConcurrentHashMap
import java.lang.ref.WeakReference
import akka.event.Logging
object SupervisorHierarchySpec {
class FireWorkerException(msg: String) extends Exception(msg)
@ -53,12 +52,18 @@ object SupervisorHierarchySpec {
}
}
case class Ready(ref: ActorRef)
case class Died(ref: ActorRef)
case object Abort
case object PingOfDeath
case object PongOfDeath
case class Event(msg: Any) { val time: Long = System.nanoTime }
case class ErrorLog(msg: String, log: Vector[Event])
case class Failure(directive: Directive, log: Vector[Event]) extends RuntimeException with NoStackTrace {
override def toString = "Failure(" + directive + ")"
case class Failure(directive: Directive, stop: Boolean, depth: Int, var failPre: Int, var failPost: Int, val failConstr: Int, stopKids: Int)
extends RuntimeException with NoStackTrace {
override def toString = productPrefix + productIterator.mkString("(", ",", ")")
}
val strategy = OneForOneStrategy() { case Failure(directive, _) directive }
case class Dump(level: Int)
val config = ConfigFactory.parseString("""
hierarchy {
@ -81,15 +86,19 @@ object SupervisorHierarchySpec {
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) {
override def suspend(cell: ActorCell): Unit = {
val a = cell.actor.asInstanceOf[Hierarchy]
a.log :+= Event("suspended")
cell.actor match {
case h: Hierarchy h.log :+= Event("suspended " + cell.mailbox.suspendCount)
case _
}
super.suspend(cell)
}
override def resume(cell: ActorCell): Unit = {
val a = cell.actor.asInstanceOf[Hierarchy]
a.log :+= Event("resumed")
super.resume(cell)
cell.actor match {
case h: Hierarchy h.log :+= Event("resumed " + cell.mailbox.suspendCount)
case _
}
}
}
@ -97,58 +106,203 @@ object SupervisorHierarchySpec {
override def dispatcher(): MessageDispatcher = instance
}
class Hierarchy(depth: Int, breadth: Int, listener: ActorRef) extends Actor {
/*
* This stores structural data of the hierarchy which would otherwise be lost
* upon Restart or would have to be managed by the highest supervisor (which
* is undesirable).
*/
case class HierarchyState(log: Vector[Event], kids: Map[ActorRef, Int], failConstr: Failure)
val stateCache = new ConcurrentHashMap[ActorRef, HierarchyState]()
class Hierarchy(size: Int, breadth: Int, listener: ActorRef, myLevel: Int) extends Actor {
var log = Vector.empty[Event]
stateCache.get(self) match {
case hs @ HierarchyState(_, _, f: Failure) if f.failConstr > 0
stateCache.put(self, hs.copy(failConstr = f.copy(failConstr = f.failConstr - 1)))
throw f
case _
}
var failed = false
var suspended = false
def abort(msg: String) {
listener ! ErrorLog(msg, log)
log = Vector(Event("log sent"))
context.parent ! Abort
context stop self
}
def setFlags(directive: Directive): Unit = directive match {
case Restart failed = true
case Resume suspended = true
case _
}
def suspendCount = context.asInstanceOf[ActorCell].mailbox.suspendCount
override def preStart {
if (depth > 1)
for (_ 1 to breadth)
context.watch(context.actorOf(Props(new Hierarchy(depth - 1, breadth, listener)).withDispatcher("hierarchy")))
listener ! self
}
override def postRestart(cause: Throwable) {
cause match {
case Failure(_, l) log = l
}
log :+= Event("restarted")
log :+= Event("started")
listener ! Ready(self)
val s = size - 1 // subtract myself
val kidInfo: Map[ActorRef, Int] =
if (s > 0) {
val kids = Random.nextInt(Math.min(breadth, s)) + 1
val sizes = s / kids
var rest = s % kids
val propsTemplate = Props.empty.withDispatcher("hierarchy")
(1 to kids).map { (id)
val kidSize = if (rest > 0) { rest -= 1; sizes + 1 } else sizes
val props = propsTemplate.withCreator(new Hierarchy(kidSize, breadth, listener, myLevel + 1))
(context.watch(context.actorOf(props, id.toString)), kidSize)
}(collection.breakOut)
} else Map()
stateCache.put(self, HierarchyState(log, kidInfo, null))
}
override def supervisorStrategy = strategy
var preRestartCalled = false
override def preRestart(cause: Throwable, msg: Option[Any]): Unit = {
// do not scrap children
if (preRestartCalled) abort("preRestart called twice")
else {
log :+= Event("preRestart " + cause)
preRestartCalled = true
cause match {
case f: Failure
context.children.take(f.stopKids) foreach { child
log :+= Event("killing " + child)
context.unwatch(child)
context.stop(child)
}
stateCache.put(self, stateCache.get(self).copy(log = log))
if (f.failPre > 0) {
f.failPre -= 1
throw f
}
case _ stateCache.put(self, stateCache.get(self).copy(log = log))
}
}
}
val unwrap: PartialFunction[Throwable, (Throwable, Throwable)] = {
case x @ PostRestartException(_, f: Failure, _) (f, x)
case x @ ActorInitializationException(_, _, f: Failure) (f, x)
case x (x, x)
}
override val supervisorStrategy = OneForOneStrategy()(unwrap andThen {
case _ if pongsToGo > 0 Resume
case (f: Failure, orig)
if (f.depth > 0) {
setFlags(f.directive)
log :+= Event("escalating " + f + " from " + sender)
throw f.copy(depth = f.depth - 1)
}
val prefix = orig match {
case f: Failure "applying "
case _ "re-applying "
}
log :+= Event(prefix + f + " to " + sender)
if (myLevel > 3 && f.failPost == 0 && f.stop) Stop else f.directive
case (_, x)
log :+= Event("unhandled exception from " + sender + Logging.stackTraceFor(x))
sender ! Dump(0)
context.system.scheduler.scheduleOnce(1 second, self, Dump(0))(context.dispatcher)
Resume
})
override def postRestart(cause: Throwable) {
val state = stateCache.get(self)
log = state.log
log :+= Event("restarted " + suspendCount + " " + cause)
state.kids foreach {
case (child, kidSize)
val name = child.path.name
if (context.actorFor(name).isTerminated) {
listener ! Died(child)
val props = Props(new Hierarchy(kidSize, breadth, listener, myLevel + 1)).withDispatcher("hierarchy")
context.watch(context.actorOf(props, name))
}
}
cause match {
case f: Failure if f.failPost > 0 f.failPost -= 1; throw f
case PostRestartException(`self`, f: Failure, _) if f.failPost > 0 f.failPost -= 1; throw f
case _
}
}
override def postStop {
if (failed || suspended) {
listener ! ErrorLog("not resumed (" + failed + ", " + suspended + ")", log)
} else {
stateCache.put(self, HierarchyState(log, Map(), null))
}
}
var failed = false
var suspended = false
var log = Vector.empty[Event]
def check(msg: Any) = {
def check(msg: Any): Boolean = {
suspended = false
log :+= Event(msg)
if (failed) {
listener ! ErrorLog("processing message while failed", log)
abort("processing message while failed")
failed = false
context stop self
}
false
} else if (context.asInstanceOf[ActorCell].mailbox.isSuspended) {
abort("processing message while suspended")
false
} else if (!Thread.currentThread.getName.startsWith("SupervisorHierarchySpec-hierarchy")) {
abort("running on wrong thread " + Thread.currentThread + " dispatcher=" + context.props.dispatcher + "=>" +
context.asInstanceOf[ActorCell].dispatcher.id)
false
} else true
}
var pongsToGo = 0
def receive = new Receive {
val handler: Receive = {
case f @ Failure(Resume, _) suspended = true; throw f.copy(log = log)
case f: Failure failed = true; throw f.copy(log = log)
case "ping" Thread.sleep((Random.nextFloat * 1.03).toLong); sender ! "pong"
case Terminated(_) listener ! ErrorLog("terminating", log); context stop self
case f: Failure
setFlags(f.directive)
stateCache.put(self, stateCache.get(self).copy(failConstr = f.copy()))
throw f
case "ping" Thread.sleep((Random.nextFloat * 1.03).toLong); sender ! "pong"
case Dump(0) abort("dump")
case Dump(level) context.children foreach (_ ! Dump(level - 1))
case Terminated(ref)
/*
* It might be that we acted upon this death already in postRestart
* (if the unwatch() came too late), so just ignore in this case.
*/
val name = ref.path.name
if (pongsToGo == 0 && context.actorFor(name).isTerminated) {
listener ! Died(ref)
val kids = stateCache.get(self).kids(ref)
val props = Props(new Hierarchy(kids, breadth, listener, myLevel + 1)).withDispatcher("hierarchy")
context.watch(context.actorOf(props, name))
}
case Abort abort("terminating")
case PingOfDeath
if (size > 1) {
pongsToGo = context.children.size
context.children foreach (_ ! PingOfDeath)
} else {
context stop self
context.parent ! PongOfDeath
}
case PongOfDeath
pongsToGo -= 1
if (pongsToGo == 0) {
context stop self
context.parent ! PongOfDeath
}
}
override def isDefinedAt(msg: Any) = handler.isDefinedAt(msg)
override def apply(msg: Any) = { check(msg); handler(msg) }
override def apply(msg: Any) = { if (check(msg)) handler(msg) }
}
}
case class Work(n: Int)
case object Work
case class GCcheck(kids: Vector[WeakReference[ActorRef]])
sealed trait Action
case class Ping(ref: ActorRef) extends Action
@ -161,6 +315,7 @@ object SupervisorHierarchySpec {
case object Finishing extends State
case object LastPing extends State
case object Stopping extends State
case object GC extends State
case object Failed extends State
/*
@ -185,6 +340,9 @@ object SupervisorHierarchySpec {
* 100 millis
* - when receiving a Work() while all actors are "pinged", stop the
* hierarchy and go to the Stopping state
* - make sure to remove all actors which die in the course of the test
* from the pinged and idle sets (others will be spawned from within the
* hierarchy)
*
* Finishing:
* - after dealing out the last action, wait for the outstanding "pong"
@ -207,30 +365,27 @@ object SupervisorHierarchySpec {
* - accumulate ErrorLog messages
* - upon termination of the hierarchy send back failed result and print
* the logs, merged and in chronological order.
*
* TODO RK: also test Stop directive, and keep a complete list of all
* actors ever created, then verify after stop()ping the hierarchy that
* all are terminated, transfer them to a WeakHashMap and verify that
* they are indeed GCed
*
* TODO RK: make hierarchy construction stochastic so that it includes
* different breadth (including the degenerate breadth-1 case).
*
* TODO RK: also test Escalate by adding an exception with a `var depth`
* which gets decremented within the supervisor and gets handled when zero
* is reached (Restart resolution)
*
* TODO RK: also test exceptions during recreate
*
* TODO RK: also test recreate including terminating children
*
* TODO RK: also verify that preRestart is not called more than once per instance
*
* Remark about test failures which lead to stopping:
* The FSM needs to know not the send more things to the dead guy, but it
* also must not watch all targets, because the dead guys supervisor also
* watches him and creates a new guy in response to the Terminated; given
* that there is no ordering relationship guaranteed by DeathWatch it could
* happen that the new guy sends his Ready before the FSM has gotten the
* Terminated, which would screw things up big time. Solution is to let the
* supervisor do all, including notifying the FSM of the death of the guy.
*/
class StressTest(testActor: ActorRef, depth: Int, breadth: Int) extends Actor with LoggingFSM[State, Null] {
class StressTest(testActor: ActorRef, size: Int, breadth: Int) extends Actor with LoggingFSM[State, Int] {
import context.system
override def supervisorStrategy = strategy
// dont escalate from this one!
override val supervisorStrategy = OneForOneStrategy() {
case f: Failure f.directive
case OriginalRestartException(f: Failure) f.directive
case ActorInitializationException(f: Failure) f.directive
case _ Stop
}
var children = Vector.empty[ActorRef]
var idleChildren = Vector.empty[ActorRef]
@ -250,7 +405,6 @@ object SupervisorHierarchySpec {
Fail(children(pick), if (x > 0.25) Restart else Resume)
})
val familySize = ((1 - BigInt(breadth).pow(depth)) / (1 - breadth)).toInt
var hierarchy: ActorRef = _
override def preRestart(cause: Throwable, msg: Option[Any]) {
@ -265,55 +419,90 @@ object SupervisorHierarchySpec {
testActor ! "stressTestStopped"
}
startWith(Idle, null)
// number of Work packages to execute for the test
startWith(Idle, size * 1000)
when(Idle) {
case Event(Init, _)
hierarchy = context.watch(context.actorOf(Props(new Hierarchy(depth, breadth, self)).withDispatcher("hierarchy")))
hierarchy = context.watch(context.actorOf(Props(new Hierarchy(size, breadth, self, 0)).withDispatcher("hierarchy"), "head"))
setTimer("phase", StateTimeout, 5 seconds, false)
goto(Init)
}
when(Init) {
case Event(ref: ActorRef, _)
if (idleChildren.nonEmpty || pingChildren.nonEmpty)
throw new IllegalStateException("received unexpected child " + children.size)
children :+= ref
if (children.size == familySize) {
idleChildren = children
goto(Stress)
} else stay
case Event(Ready(ref), _)
if (children contains ref) {
testActor ! "children not unique"
stop()
} else {
children :+= ref
if (children.size == size) goto(Stress)
else stay
}
case Event(StateTimeout, _)
testActor ! "only got %d out of %d refs".format(children.size, familySize)
testActor ! "did not get children list"
stop()
}
onTransition {
case Init -> Stress
self ! Work(familySize * 1000)
self ! Work
idleChildren = children
// set timeout for completion of the whole test (i.e. including Finishing and Stopping)
setTimer("phase", StateTimeout, 30.seconds.dilated, false)
setTimer("phase", StateTimeout, 50.seconds.dilated, false)
}
val workSchedule = 250.millis
private def random012: Int = Random.nextFloat match {
case x if x > 0.1 0
case x if x > 0.03 1
case _ 2
}
private def bury(ref: ActorRef): Unit = {
val deadGuy = ref.path.elements
val deadGuySize = deadGuy.size
val isChild = (other: ActorRef) other.path.elements.take(deadGuySize) == deadGuy
idleChildren = idleChildren filterNot isChild
pingChildren = pingChildren filterNot isChild
}
var ignoreNotResumedLogs = true
when(Stress) {
case Event(w: Work, _) if idleChildren.isEmpty
context stop hierarchy
goto(Failed)
case Event(Work(x), _) if x > 0
nextJob.next match {
case Ping(ref) ref ! "ping"
case Fail(ref, dir) ref ! Failure(dir, Vector.empty)
}
if (idleChildren.nonEmpty) self ! Work(x - 1)
else context.system.scheduler.scheduleOnce(workSchedule, self, Work(x - 1))(context.dispatcher)
case Event(Work, _) if idleChildren.isEmpty
context.system.scheduler.scheduleOnce(workSchedule, self, Work)(context.dispatcher)
stay
case Event(Work, x) if x > 0
nextJob.next match {
case Ping(ref) ref ! "ping"
case Fail(ref, dir)
val f = Failure(dir, stop = random012 > 0, depth = random012, failPre = random012, failPost = random012, failConstr = random012,
stopKids = random012 match {
case 0 0
case 1 Random.nextInt(breadth / 2)
case 2 1000
})
ref ! f
}
if (idleChildren.nonEmpty) self ! Work
else context.system.scheduler.scheduleOnce(workSchedule, self, Work)(context.dispatcher)
stay using (x - 1)
case Event(Work, _) if (pingChildren.isEmpty) goto(LastPing) else goto(Finishing)
case Event(Died(ref), _)
bury(ref)
stay
case Event(Work(_), _) if (pingChildren.isEmpty) goto(LastPing) else goto(Finishing)
case Event("pong", _)
pingChildren -= sender
idleChildren :+= sender
stay
case Event(StateTimeout, todo)
log.info("dumping state due to StateTimeout")
log.info("children: " + children.size + " pinged: " + pingChildren.size + " idle: " + idleChildren.size + " work: " + todo)
println(system.asInstanceOf[ActorSystemImpl].printTree)
ignoreNotResumedLogs = false
hierarchy ! Dump(2)
goto(Failed)
}
when(Finishing) {
@ -321,6 +510,9 @@ object SupervisorHierarchySpec {
pingChildren -= sender
idleChildren :+= sender
if (pingChildren.isEmpty) goto(LastPing) else stay
case Event(Died(ref), _)
bury(ref)
if (pingChildren.isEmpty) goto(LastPing) else stay
}
onTransition {
@ -335,56 +527,147 @@ object SupervisorHierarchySpec {
pingChildren -= sender
idleChildren :+= sender
if (pingChildren.isEmpty) goto(Stopping) else stay
case Event(Died(ref), _)
bury(ref)
if (pingChildren.isEmpty) goto(Stopping) else stay
}
onTransition {
case _ -> Stopping context stop hierarchy
case _ -> Stopping
ignoreNotResumedLogs = false
hierarchy ! PingOfDeath
}
when(Stopping, stateTimeout = 5 seconds) {
case Event(PongOfDeath, _) stay
case Event(Terminated(r), _) if r == hierarchy
testActor ! "stressTestSuccessful"
stop
val undead = children filterNot (_.isTerminated)
if (undead.nonEmpty) {
log.info("undead:\n" + undead.mkString("\n"))
testActor ! "stressTestFailed (" + undead.size + " undead)"
stop
} else if (false) {
/*
* This part of the test is normally disabled, because it does not
* work reliably: even though I found only these weak references
* using YourKit just now, GC wouldnt collect them and the test
* failed. Im leaving this code in so that manual inspection remains
* an option (by setting the above condition to true).
*/
val weak = children map (new WeakReference(_))
children = Vector.empty
pingChildren = Set.empty
idleChildren = Vector.empty
context.system.scheduler.scheduleOnce(workSchedule, self, GCcheck(weak))(context.dispatcher)
System.gc()
goto(GC)
} else {
testActor ! "stressTestSuccessful"
stop
}
case Event(StateTimeout, _)
testActor ! "timeout in Stopping"
stop
case Event(e: ErrorLog, _)
errors :+= sender -> e
goto(Failed)
}
when(GC, stateTimeout = 10 seconds) {
case Event(GCcheck(weak), _)
val next = weak filter (_.get ne null)
if (next.nonEmpty) {
println(next.size + " left")
context.system.scheduler.scheduleOnce(workSchedule, self, GCcheck(next))(context.dispatcher)
System.gc()
stay
} else {
testActor ! "stressTestSuccessful"
stop
}
case Event(StateTimeout, _)
testActor ! "timeout in GC"
stop
}
var errors = Vector.empty[(ActorRef, ErrorLog)]
when(Failed, stateTimeout = 5 seconds) {
case Event(e: ErrorLog, _)
errors :+= sender -> e
if (!e.msg.startsWith("not resumed") || !ignoreNotResumedLogs)
errors :+= sender -> e
stay
case Event(Terminated(r), _) if r == hierarchy
printErrors()
testActor ! "stressTestFailed"
stop
case Event(StateTimeout, _)
getErrors(hierarchy, 2)
printErrors()
testActor ! "timeout in Failed"
stop
case Event("pong", _) stay // dont care?
case Event("pong", _) stay // dont care?
case Event(Work, _) stay
case Event(Died(_), _) stay
}
def getErrors(target: ActorRef, depth: Int): Unit = {
target match {
case l: LocalActorRef
l.underlying.actor match {
case h: Hierarchy errors :+= target -> ErrorLog("forced", h.log)
case _ errors :+= target -> ErrorLog("fetched", stateCache.get(target).log)
}
if (depth > 0) {
l.underlying.children foreach (getErrors(_, depth - 1))
}
}
}
def getErrorsUp(target: ActorRef): Unit = {
target match {
case l: LocalActorRef
l.underlying.actor match {
case h: Hierarchy errors :+= target -> ErrorLog("forced", h.log)
case _ errors :+= target -> ErrorLog("fetched", stateCache.get(target).log)
}
if (target != hierarchy) getErrorsUp(l.getParent)
}
}
def printErrors(): Unit = {
val merged = errors flatMap {
errors collect {
case (origin, ErrorLog("dump", _)) getErrors(origin, 1)
case (origin, ErrorLog(msg, _)) if msg startsWith "not resumed" getErrorsUp(origin)
}
val merged = errors.sortBy(_._1.toString) flatMap {
case (ref, ErrorLog(msg, log))
println("Error: " + ref + " " + msg)
log map (l (l.time, ref, l.msg.toString))
}
merged.sorted foreach println
merged.sorted.distinct foreach println
}
whenUnhandled {
case Event(Ready(ref), _)
children :+= ref
idleChildren :+= ref
stay
case Event(e: ErrorLog, _)
errors :+= sender -> e
// dont stop the hierarchy, that is going to happen all by itself and in the right order
goto(Failed)
if (e.msg.startsWith("not resumed")) stay
else {
errors :+= sender -> e
// dont stop the hierarchy, that is going to happen all by itself and in the right order
goto(Failed)
}
case Event(StateTimeout, _)
println("pingChildren:\n" + pingChildren.mkString("\n"))
println("pingChildren:\n" + pingChildren.view.map(_.path.toString).toSeq.sorted.mkString("\n"))
ignoreNotResumedLogs = false
context stop hierarchy
goto(Failed)
case Event(Abort, _)
log.info("received Abort")
goto(Failed)
case Event(msg, _)
testActor ! ("received unexpected msg: " + msg)
stop
@ -491,10 +774,14 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w
}
"survive being stressed" in {
system.eventStream.publish(Mute(EventFilter[Failure]()))
system.eventStream.publish(Mute(EventFilter.warning(start = "received dead letter")))
system.eventStream.publish(Mute(
EventFilter[Failure](),
EventFilter[ActorInitializationException](),
EventFilter[NoSuchElementException]("head of empty list"),
EventFilter.error(start = "changing Resume into Restart"),
EventFilter.warning(start = "received dead ")))
val fsm = system.actorOf(Props(new StressTest(testActor, 6, 3)), "stressTest")
val fsm = system.actorOf(Props(new StressTest(testActor, size = 500, breadth = 6)), "stressTest")
fsm ! FSM.SubscribeTransitionCallBack(system.actorOf(Props(new Actor {
def receive = {

View file

@ -348,7 +348,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
assertNoCountDown(done, 1000, "Should not process messages while suspended")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1)
a.resume(inResponseToFailure = false)
a.resume(causedByFailure = null)
assertCountDown(done, 3.seconds.dilated.toMillis, "Should resume processing of messages when resumed")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1,
suspensions = 1, resumes = 1)

View file

@ -65,7 +65,7 @@ class PriorityDispatcherSpec extends AkkaSpec(PriorityDispatcherSpec.config) wit
val msgs = (1 to 100).toList
for (m msgs) actor ! m
actor.resume(inResponseToFailure = false) //Signal the actor to start treating it's message backlog
actor.resume(causedByFailure = null) //Signal the actor to start treating it's message backlog
Await.result(actor.?('Result).mapTo[List[Int]], timeout.duration) must be === msgs.reverse
}

View file

@ -319,4 +319,17 @@ akka {
tick-duration = 100ms
ticks-per-wheel = 512
}
io {
# In bytes, the size of the shared read buffer. In the span 0b..2GiB.
#
read-buffer-size = 8KiB
# Specifies how many ops are done between every descriptor selection
select-interval = 100
# Number of connections that are allowed in the backlog.
# 0 or negative means that the platform default will be used.
default-backlog = 1000
}
}

View file

@ -8,6 +8,7 @@ import akka.AkkaException
import scala.reflect.BeanProperty
import scala.util.control.NoStackTrace
import java.util.regex.Pattern
import scala.annotation.tailrec
/**
* Marker trait to show which Messages are automatically handled by Akka
@ -108,40 +109,49 @@ private[akka] case class SelectParent(next: Any) extends SelectionPath
* For instance, if you try to create an Actor that doesn't extend Actor.
*/
@SerialVersionUID(1L)
class IllegalActorStateException private[akka] (message: String, cause: Throwable = null)
extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null)
}
case class IllegalActorStateException private[akka] (message: String) extends AkkaException(message)
/**
* ActorKilledException is thrown when an Actor receives the akka.actor.Kill message
*/
@SerialVersionUID(1L)
class ActorKilledException private[akka] (message: String, cause: Throwable)
extends AkkaException(message, cause)
with NoStackTrace {
def this(msg: String) = this(msg, null)
}
case class ActorKilledException private[akka] (message: String) extends AkkaException(message) with NoStackTrace
/**
* An InvalidActorNameException is thrown when you try to convert something, usually a String, to an Actor name
* which doesn't validate.
*/
@SerialVersionUID(1L)
class InvalidActorNameException(message: String) extends AkkaException(message)
case class InvalidActorNameException(message: String) extends AkkaException(message)
/**
* An ActorInitializationException is thrown when the the initialization logic for an Actor fails.
*
* There is an extractor which works for ActorInitializationException and its subtypes:
*
* {{{
* ex match {
* case ActorInitializationException(actor, message, cause) => ...
* }
* }}}
*/
@SerialVersionUID(1L)
class ActorInitializationException private[akka] (val actor: ActorRef, message: String, cause: Throwable)
class ActorInitializationException protected (actor: ActorRef, message: String, cause: Throwable)
extends AkkaException(message, cause) {
def this(msg: String) = this(null, msg, null)
def this(actor: ActorRef, msg: String) = this(actor, msg, null)
def getActor: ActorRef = actor
}
object ActorInitializationException {
private[akka] def apply(actor: ActorRef, message: String, cause: Throwable = null): ActorInitializationException =
new ActorInitializationException(actor, message, cause)
private[akka] def apply(message: String): ActorInitializationException = new ActorInitializationException(null, message, null)
def unapply(ex: ActorInitializationException): Option[(ActorRef, String, Throwable)] = Some((ex.getActor, ex.getMessage, ex.getCause))
}
/**
* A PreRestartException is thrown when the preRestart() method failed.
* A PreRestartException is thrown when the preRestart() method failed; this
* exception is not propagated to the supervisor, as it originates from the
* already failed instance, hence it is only visible as log entry on the event
* stream.
*
* @param actor is the actor whose preRestart() hook failed
* @param cause is the exception thrown by that actor within preRestart()
@ -149,9 +159,12 @@ class ActorInitializationException private[akka] (val actor: ActorRef, message:
* @param msg is the message which was optionally passed into preRestart()
*/
@SerialVersionUID(1L)
class PreRestartException private[akka] (actor: ActorRef, cause: Throwable, val origCause: Throwable, val msg: Option[Any])
extends ActorInitializationException(actor, "exception in preRestart(" + origCause.getClass + ", " + msg.map(_.getClass) + ")", cause) {
}
case class PreRestartException private[akka] (actor: ActorRef, cause: Throwable, originalCause: Throwable, messageOption: Option[Any])
extends ActorInitializationException(actor,
"exception in preRestart(" +
(if (originalCause == null) "null" else originalCause.getClass) + ", " +
(messageOption match { case Some(m: AnyRef) m.getClass; case _ "None" }) +
")", cause)
/**
* A PostRestartException is thrown when constructor or postRestart() method
@ -162,20 +175,33 @@ class PreRestartException private[akka] (actor: ActorRef, cause: Throwable, val
* @param origCause is the exception which caused the restart in the first place
*/
@SerialVersionUID(1L)
class PostRestartException private[akka] (actor: ActorRef, cause: Throwable, val origCause: Throwable)
extends ActorInitializationException(actor, "exception post restart (" + origCause.getClass + ")", cause) {
case class PostRestartException private[akka] (actor: ActorRef, cause: Throwable, originalCause: Throwable)
extends ActorInitializationException(actor,
"exception post restart (" + (if (originalCause == null) "null" else originalCause.getClass) + ")", cause)
/**
* This is an extractor for retrieving the original cause (i.e. the first
* failure) from a [[akka.actor.PostRestartException]]. In the face of multiple
* nested restarts it will walk the origCause-links until it arrives at a
* non-PostRestartException type.
*/
@SerialVersionUID(1L)
object OriginalRestartException {
def unapply(ex: PostRestartException): Option[Throwable] = {
@tailrec def rec(ex: PostRestartException): Option[Throwable] = ex match {
case PostRestartException(_, _, e: PostRestartException) rec(e)
case PostRestartException(_, _, e) Some(e)
}
rec(ex)
}
}
/**
* InvalidMessageException is thrown when an invalid message is sent to an Actor.
* Technically it's only "null" which is an InvalidMessageException but who knows,
* there might be more of them in the future, or not.
* InvalidMessageException is thrown when an invalid message is sent to an Actor;
* Currently only `null` is an invalid message.
*/
@SerialVersionUID(1L)
class InvalidMessageException private[akka] (message: String, cause: Throwable = null)
extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null)
}
case class InvalidMessageException private[akka] (message: String) extends AkkaException(message)
/**
* A DeathPactException is thrown by an Actor that receives a Terminated(someActor) message
@ -323,7 +349,7 @@ trait Actor {
protected[akka] implicit val context: ActorContext = {
val contextStack = ActorCell.contextStack.get
if ((contextStack.isEmpty) || (contextStack.head eq null))
throw new ActorInitializationException(
throw ActorInitializationException(
"\n\tYou cannot create an instance of [" + getClass.getName + "] explicitly using the constructor (new)." +
"\n\tYou have to use one of the factory methods to create a new actor. Either use:" +
"\n\t\t'val actor = context.actorOf(Props[MyActor])' (to create a supervised child actor from within an actor), or" +

View file

@ -188,7 +188,7 @@ private[akka] trait Cell {
/**
* Recursively resume this actor and all its children.
*/
def resume(inResponseToFailure: Boolean): Unit
def resume(causedByFailure: Throwable): Unit
/**
* Restart this actor (will recursively restart or stop all children).
*/
@ -300,21 +300,46 @@ private[akka] class ActorCell(
* MESSAGE PROCESSING
*/
//Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
final def systemInvoke(message: SystemMessage): Unit = try {
message match {
case Create() create()
case Recreate(cause) faultRecreate(cause)
case Watch(watchee, watcher) addWatcher(watchee, watcher)
case Unwatch(watchee, watcher) remWatcher(watchee, watcher)
case Suspend() faultSuspend()
case Resume(inRespToFailure) faultResume(inRespToFailure)
case Terminate() terminate()
case Supervise(child) supervise(child)
case ChildTerminated(child) handleChildTerminated(child)
case NoMessage // only here to suppress warning
@tailrec final def systemInvoke(message: SystemMessage): Unit = {
/*
* When recreate/suspend/resume are received while restarting (i.e. between
* preRestart and postRestart, waiting for children to terminate), these
* must not be executed immediately, but instead queued and released after
* finishRecreate returns. This can only ever be triggered by
* ChildTerminated, and ChildTerminated is not one of the queued message
* types (hence the overwrite further down). Mailbox sets message.next=null
* before systemInvoke, so this will only be non-null during such a replay.
*/
var todo = message.next
try {
message match {
case Create() create()
case Watch(watchee, watcher) addWatcher(watchee, watcher)
case Unwatch(watchee, watcher) remWatcher(watchee, watcher)
case Recreate(cause)
recreationOrNull match {
case null faultRecreate(cause)
case r message.next = r.todo; r.todo = message
}
case Suspend()
recreationOrNull match {
case null faultSuspend()
case r message.next = r.todo; r.todo = message
}
case Resume(inRespToFailure)
recreationOrNull match {
case null faultResume(inRespToFailure)
case r message.next = r.todo; r.todo = message
}
case Terminate() terminate()
case Supervise(child) supervise(child)
case ChildTerminated(child) todo = handleChildTerminated(child)
case NoMessage // only here to suppress warning
}
} catch {
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(Nil, e, "error while processing " + message)
}
} catch {
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(e, "error while processing " + message)
if (todo != null) systemInvoke(todo)
}
//Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
@ -327,7 +352,7 @@ private[akka] class ActorCell(
}
currentMessage = null // reset current message after successful invocation
} catch {
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(e, e.getMessage)
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(Nil, e, e.getMessage)
} finally {
checkReceiveTimeout // Reschedule receive timeout
}
@ -386,7 +411,7 @@ private[akka] class ActorCell(
val instance = props.creator.apply()
if (instance eq null)
throw new ActorInitializationException(self, "Actor instance passed to actorOf can't be 'null'")
throw ActorInitializationException(self, "Actor instance passed to actorOf can't be 'null'")
// If no becomes were issued, the actors behavior is its receive method
behaviorStack = if (behaviorStack.isEmpty) instance.receive :: behaviorStack else behaviorStack
@ -407,12 +432,12 @@ private[akka] class ActorCell(
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
} catch {
case NonFatal(i: InstantiationException)
throw new ActorInitializationException(self,
throw ActorInitializationException(self,
"""exception during creation, this problem is likely to occur because the class of the Actor you tried to create is either,
a non-static inner class (in which case make it a static inner class or use Props(new ...) or Props( new UntypedActorFactory ... )
or is missing an appropriate, reachable no-args constructor.
""", i.getCause)
case NonFatal(e) throw new ActorInitializationException(self, "exception during creation", e)
case NonFatal(e) throw ActorInitializationException(self, "exception during creation", e)
}
}

View file

@ -191,7 +191,7 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe
/*
* Actor life-cycle management, invoked only internally (in response to user requests via ActorContext).
*/
def resume(inResponseToFailure: Boolean): Unit
def resume(causedByFailure: Throwable): Unit
def suspend(): Unit
def restart(cause: Throwable): Unit
def stop(): Unit
@ -288,7 +288,7 @@ private[akka] class LocalActorRef private[akka] (
/**
* Resumes a suspended actor.
*/
override def resume(inResponseToFailure: Boolean): Unit = actorCell.resume(inResponseToFailure)
override def resume(causedByFailure: Throwable): Unit = actorCell.resume(causedByFailure)
/**
* Shuts down the actor and its message queue
@ -388,7 +388,7 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef {
override def getChild(names: Iterator[String]): InternalActorRef = if (names.forall(_.isEmpty)) this else Nobody
override def suspend(): Unit = ()
override def resume(inResponseToFailure: Boolean): Unit = ()
override def resume(causedByFailure: Throwable): Unit = ()
override def stop(): Unit = ()
override def isTerminated = false

View file

@ -253,7 +253,7 @@ abstract class SupervisorStrategy {
def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = {
val directive = if (decider.isDefinedAt(cause)) decider(cause) else Escalate //FIXME applyOrElse in Scala 2.10
directive match {
case Resume resumeChild(child); true
case Resume resumeChild(child, cause); true
case Restart processFailure(context, true, child, cause, stats, children); true
case Stop processFailure(context, false, child, cause, stats, children); true
case Escalate false
@ -265,7 +265,7 @@ abstract class SupervisorStrategy {
* is not the currently failing child</b>. Suspend/resume needs to be done in
* matching pairs, otherwise actors will wake up too soon or never at all.
*/
final def resumeChild(child: ActorRef): Unit = child.asInstanceOf[InternalActorRef].resume(inResponseToFailure = true)
final def resumeChild(child: ActorRef, cause: Throwable): Unit = child.asInstanceOf[InternalActorRef].resume(causedByFailure = cause)
/**
* Restart the given child, possibly suspending it first.
@ -273,7 +273,7 @@ abstract class SupervisorStrategy {
* <b>IMPORTANT:</b>
*
* If the child is the currently failing one, it will already have been
* suspended, hence `suspendFirst` is false. If the child is not the
* suspended, hence `suspendFirst` must be false. If the child is not the
* currently failing one, then it did not request this treatment and is
* therefore not prepared to be resumed without prior suspend.
*/

View file

@ -11,7 +11,6 @@ import scala.concurrent.util.Duration
import scala.util.control.NonFatal
import akka.util.ByteString
import java.net.{ SocketAddress, InetSocketAddress }
import java.io.IOException
import java.nio.ByteBuffer
import java.nio.channels.{
SelectableChannel,
@ -27,6 +26,8 @@ import scala.collection.mutable
import scala.annotation.tailrec
import scala.collection.generic.CanBuildFrom
import java.util.UUID
import java.io.{ EOFException, IOException }
/**
* IO messages and iteratees.
*
@ -177,6 +178,13 @@ object IO {
*/
case class ReuseAddress(on: Boolean) extends SocketOption with ServerSocketOption
/**
* [[akka.actor.IO.ServerSocketOption]] to set the maximum backlog of connections. 0 or negative means that the platform default will be used.
* For more information see [[http://docs.oracle.com/javase/7/docs/api/java/nio/channels/ServerSocketChannel.html#bind(java.net.SocketAddress, int)]]
* @param numberOfConnections
*/
case class Backlog(numberOfConnections: Int) extends ServerSocketOption
/**
* [[akka.actor.IO.SocketOption]] to set the SO_SNDBUF option for this
* [[akka.actor.IO.SocketHandle]].
@ -806,13 +814,30 @@ object IO {
* An IOManager does not need to be manually stopped when not in use as it will
* automatically enter an idle state when it has no channels to manage.
*/
final class IOManager private (system: ActorSystem) extends Extension { //FIXME how about taking an ActorContext
final class IOManager private (system: ExtendedActorSystem) extends Extension { //FIXME how about taking an ActorContext
/**
* A reference to the [[akka.actor.IOManagerActor]] that performs the actual
* IO. It communicates with other actors using subclasses of
* [[akka.actor.IO.IOMessage]].
*/
val actor = system.actorOf(Props[IOManagerActor], "io-manager")
val actor = {
val c = system.settings.config.getConfig("akka.io")
val readBufferSize = {
val sz = c.getBytes("read-buffer-size")
require(sz <= Int.MaxValue && sz > 0)
sz.toInt
}
val selectInterval = {
val i = c.getInt("select-interval")
require(i > 0)
i
}
val defaultBacklog = c.getInt("default-backlog")
system.actorOf(Props(new IOManagerActor(readBufferSize, selectInterval, defaultBacklog)), "io-manager")
}
/**
* Create a ServerSocketChannel listening on an address. Messages will be
@ -898,11 +923,13 @@ object IOManager extends ExtensionId[IOManager] with ExtensionIdProvider {
*
* Use [[akka.actor.IOManager]] to retrieve an instance of this Actor.
*/
final class IOManagerActor extends Actor with ActorLogging {
final class IOManagerActor(
val readBufferSize: Int,
val selectInterval: Int,
/** force a select when lastSelect reaches this amount */
val defaultBacklog: Int) extends Actor with ActorLogging {
import SelectionKey.{ OP_READ, OP_WRITE, OP_ACCEPT, OP_CONNECT }
private val bufferSize = 8192 // FIXME TODO: make configurable
private type ReadChannel = ReadableByteChannel with SelectableChannel
private type WriteChannel = WritableByteChannel with SelectableChannel
@ -918,14 +945,11 @@ final class IOManagerActor extends Actor with ActorLogging {
private val closing = mutable.Set.empty[IO.Handle]
/** Buffer used for all reads */
private val buffer = ByteBuffer.allocate(bufferSize)
private val buffer = ByteBuffer.allocate(readBufferSize)
/** a counter that is incremented each time a message is retrieved */
private var lastSelect = 0
/** force a select when lastSelect reaches this amount */
private val selectAt = 100 // FIXME TODO: make configurable
/** true while the selector is open and channels.nonEmpty */
private var running = false
@ -952,10 +976,14 @@ final class IOManagerActor extends Actor with ActorLogging {
}
}
lastSelect += 1
if (lastSelect >= selectAt) select()
if (lastSelect >= selectInterval)
running = select()
}
private def select() {
/**
* @return true if we should be running and false if not
*/
private def select(): Boolean = try {
if (selector.isOpen) {
// TODO: Make select behaviour configurable.
// Blocking 1ms reduces allocations during idle times, non blocking gives better performance.
@ -965,12 +993,13 @@ final class IOManagerActor extends Actor with ActorLogging {
while (keys.hasNext) {
val key = keys.next()
keys.remove()
if (key.isValid) { process(key) }
if (key.isValid) process(key)
}
if (channels.isEmpty) running = false
if (channels.isEmpty) false else running
} else {
running = false
false
}
} finally {
lastSelect = 0
}
@ -994,7 +1023,7 @@ final class IOManagerActor extends Actor with ActorLogging {
def receive = {
case Select
select()
running = select()
if (running) self ! Select
selectSent = running
@ -1002,18 +1031,20 @@ final class IOManagerActor extends Actor with ActorLogging {
val channel = ServerSocketChannel open ()
channel configureBlocking false
var backlog = defaultBacklog
val sock = channel.socket
options foreach {
case IO.ReceiveBufferSize(size) forwardFailure(sock.setReceiveBufferSize(size))
case IO.ReuseAddress(on) forwardFailure(sock.setReuseAddress(on))
case IO.PerformancePreferences(connTime, latency, bandwidth)
forwardFailure(sock.setPerformancePreferences(connTime, latency, bandwidth))
case IO.Backlog(number) backlog = number
}
channel.socket bind (address, 1000) // FIXME TODO: make backlog configurable
channel.socket bind (address, backlog)
channels update (server, channel)
channel register (selector, OP_ACCEPT, server)
server.owner ! IO.Listening(server, channel.socket.getLocalSocketAddress())
server.owner ! IO.Listening(server, sock.getLocalSocketAddress())
run()
case IO.Connect(socket, address, options)
@ -1037,27 +1068,21 @@ final class IOManagerActor extends Actor with ActorLogging {
case IO.Write(handle, data)
if (channels contains handle) {
val queue = {
val existing = writes get handle
if (existing.isDefined) existing.get
else {
val q = new WriteBuffer(bufferSize)
writes update (handle, q)
q
}
val queue = writes get handle getOrElse {
val q = new WriteBuffer(readBufferSize)
writes update (handle, q)
q
}
if (queue.isEmpty) addOps(handle, OP_WRITE)
queue enqueue data
if (queue.length >= bufferSize) write(handle, channels(handle).asInstanceOf[WriteChannel])
if (queue.length >= readBufferSize) write(handle, channels(handle).asInstanceOf[WriteChannel])
}
run()
case IO.Close(handle: IO.WriteHandle)
if (writes get handle filterNot (_.isEmpty) isDefined) {
closing += handle
} else {
cleanup(handle, None)
}
//If we still have pending writes, add to set of closing handles
if (writes get handle exists (_.isEmpty == false)) closing += handle
else cleanup(handle, None)
run()
case IO.Close(handle)
@ -1089,12 +1114,12 @@ final class IOManagerActor extends Actor with ActorLogging {
case server: IO.ServerHandle accepted -= server
case writable: IO.WriteHandle writes -= writable
}
channels.get(handle) match {
case Some(channel)
channel.close
channels -= handle
if (!handle.owner.isTerminated) handle.owner ! IO.Closed(handle, cause)
case None
channels.get(handle) foreach {
channel
try channel.close finally {
channels -= handle
if (!handle.owner.isTerminated) handle.owner ! IO.Closed(handle, cause)
}
}
}
@ -1115,7 +1140,7 @@ final class IOManagerActor extends Actor with ActorLogging {
removeOps(socket, OP_CONNECT)
socket.owner ! IO.Connected(socket, channel.socket.getRemoteSocketAddress())
} else {
cleanup(socket, None) // TODO: Add a cause
cleanup(socket, Some(new IllegalStateException("Channel for socket handle [%s] didn't finish connect" format socket)))
}
}
@ -1144,7 +1169,7 @@ final class IOManagerActor extends Actor with ActorLogging {
buffer.clear
val readLen = channel read buffer
if (readLen == -1) {
cleanup(handle, None) // TODO: Add a cause
cleanup(handle, Some(new EOFException("Elvis has left the building")))
} else if (readLen > 0) {
buffer.flip
handle.owner ! IO.Read(handle, ByteString(buffer))

View file

@ -81,7 +81,7 @@ private[akka] class RepointableActorRef(
def suspend(): Unit = underlying.suspend()
def resume(inResponseToFailure: Boolean): Unit = underlying.resume(inResponseToFailure)
def resume(causedByFailure: Throwable): Unit = underlying.resume(causedByFailure)
def stop(): Unit = underlying.stop()
@ -171,7 +171,7 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep
def system: ActorSystem = systemImpl
def suspend(): Unit = { lock.lock(); try suspendCount += 1 finally lock.unlock() }
def resume(inResponseToFailure: Boolean): Unit = { lock.lock(); try suspendCount -= 1 finally lock.unlock() }
def resume(causedByFailure: Throwable): Unit = { lock.lock(); try suspendCount -= 1 finally lock.unlock() }
def restart(cause: Throwable): Unit = { lock.lock(); try suspendCount -= 1 finally lock.unlock() }
def stop(): Unit = sendSystemMessage(Terminate())
def isTerminated: Boolean = false

View file

@ -69,7 +69,7 @@ trait Stash {
private val mailbox: DequeBasedMessageQueue = {
context.asInstanceOf[ActorCell].mailbox.messageQueue match {
case queue: DequeBasedMessageQueue queue
case other throw new ActorInitializationException(self, "DequeBasedMailbox required, got: " + other.getClass() + """
case other throw ActorInitializationException(self, "DequeBasedMailbox required, got: " + other.getClass() + """
An (unbounded) deque-based mailbox can be configured as follows:
my-custom-dispatcher {
mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox"

View file

@ -7,12 +7,12 @@ package akka.actor.cell
import scala.annotation.tailrec
import scala.collection.JavaConverters.asJavaIterableConverter
import scala.util.control.NonFatal
import akka.actor.{ RepointableRef, Props, NoSerializationVerificationNeeded, InvalidActorNameException, InternalActorRef, ChildRestartStats, ActorRef }
import akka.actor.ActorCell
import akka.actor.ActorPath.ElementRegex
import akka.serialization.SerializationExtension
import akka.util.{ Unsafe, Helpers }
import akka.actor.ChildRestartStats
private[akka] trait Children { this: ActorCell
@ -113,14 +113,22 @@ private[akka] trait Children { this: ActorCell ⇒
protected def isTerminating = childrenRefs.isTerminating
protected def suspendChildren(skip: Set[ActorRef] = Set.empty): Unit =
protected def recreationOrNull = childrenRefs match {
case TerminatingChildrenContainer(_, _, r: Recreation) r
case _ null
}
protected def suspendChildren(exceptFor: Set[ActorRef] = Set.empty): Unit =
childrenRefs.stats foreach {
case ChildRestartStats(child, _, _) if !(skip contains child) child.asInstanceOf[InternalActorRef].suspend()
case ChildRestartStats(child, _, _) if !(exceptFor contains child) child.asInstanceOf[InternalActorRef].suspend()
case _
}
protected def resumeChildren(): Unit =
childrenRefs.stats foreach (_.child.asInstanceOf[InternalActorRef].resume(inResponseToFailure = false))
protected def resumeChildren(causedByFailure: Throwable, perp: ActorRef): Unit =
childrenRefs.stats foreach {
case ChildRestartStats(child: InternalActorRef, _, _)
child.resume(if (perp == child) causedByFailure else null)
}
def getChildByName(name: String): Option[ChildRestartStats] = childrenRefs.getByName(name)
@ -180,6 +188,8 @@ private[akka] trait Children { this: ActorCell ⇒
unreserveChild(name)
throw e
}
// mailbox==null during RoutedActorCell constructor, where suspends are queued otherwise
if (mailbox ne null) for (_ 1 to mailbox.suspendCount) actor.suspend()
addChild(actor)
actor
}

View file

@ -7,6 +7,7 @@ package akka.actor.cell
import scala.collection.immutable.TreeMap
import akka.actor.{ InvalidActorNameException, ChildStats, ChildRestartStats, ChildNameReserved, ActorRef }
import akka.dispatch.SystemMessage
/**
* INTERNAL API
@ -43,7 +44,8 @@ private[akka] object ChildrenContainer {
sealed trait SuspendReason
case object UserRequest extends SuspendReason
case class Recreation(cause: Throwable) extends SuspendReason
// careful with those system messages, all handling to be taking place in ActorCell.scala!
case class Recreation(cause: Throwable, var todo: SystemMessage = null) extends SuspendReason
case object Termination extends SuspendReason
trait EmptyChildrenContainer extends ChildrenContainer {

View file

@ -62,7 +62,7 @@ private[akka] trait Dispatch { this: ActorCell ⇒
final def suspend(): Unit = dispatcher.systemDispatch(this, Suspend())
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def resume(inResponseToFailure: Boolean): Unit = dispatcher.systemDispatch(this, Resume(inResponseToFailure))
final def resume(causedByFailure: Throwable): Unit = dispatcher.systemDispatch(this, Resume(causedByFailure))
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause))

View file

@ -5,11 +5,12 @@
package akka.actor.cell
import scala.annotation.tailrec
import akka.actor.{ PreRestartException, PostRestartException, InternalActorRef, Failed, ActorRef, ActorInterruptedException, ActorCell, Actor }
import akka.dispatch.{ Envelope, ChildTerminated }
import akka.event.Logging.{ Warning, Error, Debug }
import scala.util.control.NonFatal
import akka.dispatch.SystemMessage
import akka.event.Logging
private[akka] trait FaultHandling { this: ActorCell
@ -36,10 +37,11 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
* a restart with dying children)
* might well be replaced by ref to a Cancellable in the future (see #2299)
*/
private var _failed = false
private def isFailed: Boolean = _failed
private def setFailed(): Unit = _failed = true
private def clearFailed(): Unit = _failed = false
private var _failed: ActorRef = null
private def isFailed: Boolean = _failed != null
private def setFailed(perpetrator: ActorRef): Unit = _failed = perpetrator
private def clearFailed(): Unit = _failed = null
private def perpetrator: ActorRef = _failed
/**
* Do re-create the actor in response to a failure.
@ -65,7 +67,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
if (!setChildrenTerminationReason(ChildrenContainer.Recreation(cause))) finishRecreate(cause, failedActor)
} else {
// need to keep that suspend counter balanced
faultResume(inResponseToFailure = false)
faultResume(causedByFailure = null)
}
/**
@ -81,15 +83,22 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
/**
* Do resume the actor in response to a failure.
*
* @param inResponseToFailure signifies if it was our own failure which
* @param causedByFailure signifies if it was our own failure which
* prompted this action.
*/
protected def faultResume(inResponseToFailure: Boolean): Unit = {
// done always to keep that suspend counter balanced
// must happen atomically
try resumeNonRecursive()
finally if (inResponseToFailure) clearFailed()
resumeChildren()
protected def faultResume(causedByFailure: Throwable): Unit = {
if ((actor == null || actor.context == null) && causedByFailure != null) {
system.eventStream.publish(Error(self.path.toString, clazz(actor),
"changing Resume into Restart after " + causedByFailure))
faultRecreate(causedByFailure)
} else {
val perp = perpetrator
// done always to keep that suspend counter balanced
// must happen atomically
try resumeNonRecursive()
finally if (causedByFailure != null) clearFailed()
resumeChildren(causedByFailure, perp)
}
}
protected def terminate() {
@ -106,7 +115,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
// do not process normal messages while waiting for all children to terminate
suspendNonRecursive()
// do not propagate failures during shutdown to the supervisor
setFailed()
setFailed(self)
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopping"))
}
} else {
@ -115,18 +124,17 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
}
}
final def handleInvokeFailure(t: Throwable, message: String): Unit = {
final def handleInvokeFailure(childrenNotToSuspend: Iterable[ActorRef], t: Throwable, message: String): Unit = {
publish(Error(t, self.path.toString, clazz(actor), message))
// prevent any further messages to be processed until the actor has been restarted
if (!isFailed) try {
suspendNonRecursive()
setFailed()
// suspend children
val skip: Set[ActorRef] = currentMessage match {
case Envelope(Failed(`t`), child) Set(child)
case _ Set.empty
case Envelope(Failed(_), child) setFailed(child); Set(child)
case _ setFailed(self); Set.empty
}
suspendChildren(skip)
suspendChildren(exceptFor = skip ++ childrenNotToSuspend)
// tell supervisor
t match { // Wrap InterruptedExceptions and rethrow
case _: InterruptedException parent.tell(Failed(new ActorInterruptedException(t)), self); throw t
@ -134,7 +142,8 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
}
} catch {
case NonFatal(e)
publish(Error(e, self.path.toString, clazz(actor), "emergency stop: exception in failure handling"))
publish(Error(e, self.path.toString, clazz(actor),
"emergency stop: exception in failure handling for " + t.getClass + Logging.stackTraceFor(t)))
try children foreach stop
finally finishTerminate()
}
@ -155,30 +164,32 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
}
}
private def finishRecreate(cause: Throwable, failedActor: Actor): Unit = try {
try resumeNonRecursive()
finally clearFailed() // must happen in any case, so that failure is propagated
private def finishRecreate(cause: Throwable, failedActor: Actor): Unit = {
// need to keep a snapshot of the surviving children before the new actor instance creates new ones
val survivors = children
val freshActor = newActor()
actor = freshActor // this must happen before postRestart has a chance to fail
if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields.
try {
try resumeNonRecursive()
finally clearFailed() // must happen in any case, so that failure is propagated
freshActor.postRestart(cause)
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
val freshActor = newActor()
actor = freshActor // this must happen before postRestart has a chance to fail
if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields.
// only after parent is up and running again do restart the children which were not stopped
survivors foreach (child
try child.asInstanceOf[InternalActorRef].restart(cause)
catch {
case NonFatal(e) publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child))
})
} catch {
case NonFatal(e)
clearActorFields(actor) // in order to prevent preRestart() from happening again
handleInvokeFailure(new PostRestartException(self, e, cause), e.getMessage)
freshActor.postRestart(cause)
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
// only after parent is up and running again do restart the children which were not stopped
survivors foreach (child
try child.asInstanceOf[InternalActorRef].restart(cause)
catch {
case NonFatal(e) publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child))
})
} catch {
case NonFatal(e)
clearActorFields(actor) // in order to prevent preRestart() from happening again
handleInvokeFailure(survivors, new PostRestartException(self, e, cause), e.getMessage)
}
}
final protected def handleFailure(child: ActorRef, cause: Throwable): Unit = getChildByRef(child) match {
@ -186,7 +197,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
case None publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child))
}
final protected def handleChildTerminated(child: ActorRef): Unit = {
final protected def handleChildTerminated(child: ActorRef): SystemMessage = {
val status = removeChildAndGetStateChange(child)
/*
* if this fails, we do nothing in case of terminating/restarting state,
@ -195,16 +206,16 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
*/
try actor.supervisorStrategy.handleChildTerminated(this, child, children)
catch {
case NonFatal(e) handleInvokeFailure(e, "handleChildTerminated failed")
case NonFatal(e) handleInvokeFailure(Nil, e, "handleChildTerminated failed")
}
/*
* if the removal changed the state of the (terminating) children container,
* then we are continuing the previously suspended recreate/terminate action
*/
status match {
case Some(ChildrenContainer.Recreation(cause)) finishRecreate(cause, actor)
case Some(ChildrenContainer.Termination) finishTerminate()
case _
case Some(ChildrenContainer.Recreation(cause, todo)) finishRecreate(cause, actor); SystemMessage.reverse(todo)
case Some(ChildrenContainer.Termination) finishTerminate(); null
case _ null
}
}
}

View file

@ -90,7 +90,7 @@ private[akka] case class Suspend() extends SystemMessage // sent to self from Ac
/**
* INTERNAL API
*/
private[akka] case class Resume(inResponseToFailure: Boolean) extends SystemMessage // sent to self from ActorCell.resume
private[akka] case class Resume(causedByFailure: Throwable) extends SystemMessage // sent to self from ActorCell.resume
/**
* INTERNAL API
*/

View file

@ -107,6 +107,9 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
@inline
final def shouldProcessMessage: Boolean = (status & shouldNotProcessMask) == 0
@inline
final def suspendCount: Int = status / suspendUnit
@inline
final def isSuspended: Boolean = (status & suspendMask) != 0
@ -271,7 +274,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
}
}
// if something happened while processing, fail this actor (most probable: exception in supervisorStrategy)
if (failure ne null) actor.handleInvokeFailure(failure, failure.getMessage)
if (failure ne null) actor.handleInvokeFailure(Nil, failure, failure.getMessage)
}
/**

View file

@ -72,7 +72,7 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo
}
if (routerConfig.resizer.isEmpty && _routees.isEmpty)
throw new ActorInitializationException("router " + routerConfig + " did not register routees!")
throw ActorInitializationException("router " + routerConfig + " did not register routees!")
start(sendSupervise = false)
@ -285,7 +285,7 @@ trait Router extends Actor {
val ref = context match {
case x: RoutedActorCell x
case _ throw new ActorInitializationException("Router actor can only be used in RoutedActorRef, not in " + context.getClass)
case _ throw ActorInitializationException("Router actor can only be used in RoutedActorRef, not in " + context.getClass)
}
final def receive = ({

View file

@ -217,7 +217,7 @@ class Agent[T](initialValue: T, refFactory: ActorRefFactory, system: ActorSystem
/**
* Resumes processing of `send` actions for the agent.
*/
def resume(): Unit = updater.resume(inResponseToFailure = false)
def resume(): Unit = updater.resume(causedByFailure = null)
/**
* Closes the agents and makes it eligible for garbage collection.
@ -296,4 +296,4 @@ private[akka] class AgentUpdater[T](agent: Agent[T], ref: Ref[T]) extends Actor
}
def update(function: T T): T = ref.single.transformAndGet(function)
}
}

View file

@ -235,7 +235,7 @@ private[akka] class RemoteActorRef private[akka] (
def suspend(): Unit = sendSystemMessage(Suspend())
def resume(inResponseToFailure: Boolean): Unit = sendSystemMessage(Resume(inResponseToFailure))
def resume(causedByFailure: Throwable): Unit = sendSystemMessage(Resume(causedByFailure))
def stop(): Unit = sendSystemMessage(Terminate())

View file

@ -148,7 +148,7 @@ class CallingThreadDispatcher(
val queue = mbox.queue
queue.enter
runQueue(mbox, queue)
case x throw new ActorInitializationException("expected CallingThreadMailbox, got " + x.getClass)
case x throw ActorInitializationException("expected CallingThreadMailbox, got " + x.getClass)
}
}
@ -171,7 +171,7 @@ class CallingThreadDispatcher(
if (switched && !wasActive) {
runQueue(mbox, queue)
}
case m m.systemEnqueue(actor.self, Resume(false))
case m m.systemEnqueue(actor.self, Resume(causedByFailure = null))
}
}

View file

@ -127,7 +127,7 @@ object TestActorRef {
def apply[T <: Actor](name: String)(implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](Props({
system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[T](t.runtimeClass, Seq()) match {
case Right(value) value
case Left(exception) throw new ActorInitializationException(null,
case Left(exception) throw ActorInitializationException(null,
"Could not instantiate Actor" +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +

View file

@ -532,7 +532,7 @@ object Dependency {
// Compile
val camelCore = "org.apache.camel" % "camel-core" % "2.8.0" exclude("org.slf4j", "slf4j-api") // ApacheV2
val config = "com.typesafe" % "config" % "0.5.0" // ApacheV2
val netty = "io.netty" % "netty" % "3.5.1.Final" // ApacheV2
val netty = "io.netty" % "netty" % "3.5.3.Final" // ApacheV2
val protobuf = "com.google.protobuf" % "protobuf-java" % "2.4.1" // New BSD
val scalaStm = "org.scala-tools" %% "scala-stm" % "0.6" // Modified BSD (Scala)
val slf4jApi = "org.slf4j" % "slf4j-api" % "1.6.4" // MIT