catch ActorCell creation failures for top-level actors #15947
Previously a failure during e.g. MailboxType.create() would make the user guardian fail, tearing down the whole system as a result. The cause is a deep bug in handling ActorCell creation that we cannot really fix anymore due to resulting changes in semantics, hence this fix only targets top-level actors (where the observable difference is an unambiguous improvement). fixes #15947
This commit is contained in:
parent
2bc2dcb14a
commit
7cf99134dc
7 changed files with 104 additions and 12 deletions
|
|
@ -5,7 +5,6 @@
|
|||
package akka.actor
|
||||
|
||||
import language.postfixOps
|
||||
|
||||
import org.scalatest.BeforeAndAfterEach
|
||||
import scala.concurrent.duration._
|
||||
import akka.{ Die, Ping }
|
||||
|
|
@ -14,6 +13,12 @@ import akka.testkit._
|
|||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import scala.concurrent.Await
|
||||
import akka.pattern.ask
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.dispatch.MailboxType
|
||||
import akka.dispatch.MessageQueue
|
||||
import com.typesafe.config.Config
|
||||
import akka.ConfigurationException
|
||||
import akka.routing.RoundRobinPool
|
||||
|
||||
object SupervisorSpec {
|
||||
val Timeout = 5.seconds
|
||||
|
|
@ -64,10 +69,40 @@ object SupervisorSpec {
|
|||
case Status.Failure(_) ⇒ /*Ignore*/
|
||||
}
|
||||
}
|
||||
|
||||
class Creator(target: ActorRef) extends Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy() {
|
||||
case ex ⇒
|
||||
target ! ((self, sender(), ex))
|
||||
SupervisorStrategy.Stop
|
||||
}
|
||||
def receive = {
|
||||
case p: Props ⇒ sender() ! context.actorOf(p)
|
||||
}
|
||||
}
|
||||
|
||||
def creator(target: ActorRef, fail: Boolean = false) = {
|
||||
val p = Props(new Creator(target))
|
||||
if (fail) p.withMailbox("error-mailbox") else p
|
||||
}
|
||||
|
||||
val failure = new AssertionError("deliberate test failure")
|
||||
|
||||
class Mailbox(settings: ActorSystem.Settings, config: Config) extends MailboxType {
|
||||
override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
|
||||
throw failure
|
||||
}
|
||||
|
||||
val config = ConfigFactory.parseString("""
|
||||
akka.actor.serialize-messages = off
|
||||
error-mailbox {
|
||||
mailbox-type = "akka.actor.SupervisorSpec$Mailbox"
|
||||
}
|
||||
""")
|
||||
}
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class SupervisorSpec extends AkkaSpec("akka.actor.serialize-messages = off") with BeforeAndAfterEach with ImplicitSender with DefaultTimeout {
|
||||
class SupervisorSpec extends AkkaSpec(SupervisorSpec.config) with BeforeAndAfterEach with ImplicitSender with DefaultTimeout {
|
||||
|
||||
import SupervisorSpec._
|
||||
|
||||
|
|
@ -423,5 +458,43 @@ class SupervisorSpec extends AkkaSpec("akka.actor.serialize-messages = off") wit
|
|||
parent ! "testchild"
|
||||
expectMsg("child green")
|
||||
}
|
||||
|
||||
"log pre-creation check failures" when {
|
||||
|
||||
"creating a top-level actor" in EventFilter[ActorInitializationException](occurrences = 1).intercept {
|
||||
val ref = system.actorOf(creator(testActor, fail = true))
|
||||
watch(ref)
|
||||
expectTerminated(ref)
|
||||
}
|
||||
|
||||
"creating a normal child actor" in EventFilter[ConfigurationException](occurrences = 1).intercept {
|
||||
val top = system.actorOf(creator(testActor))
|
||||
top ! creator(testActor)
|
||||
val middle = expectMsgType[ActorRef]
|
||||
middle ! creator(testActor, fail = true)
|
||||
expectMsgPF(hint = "ConfigurationException") {
|
||||
case (top, middle, ex: ConfigurationException) ⇒
|
||||
ex.getCause should ===(failure)
|
||||
}
|
||||
}
|
||||
|
||||
"creating a top-level router" in EventFilter[ActorInitializationException](occurrences = 1).intercept {
|
||||
val ref = system.actorOf(creator(testActor, fail = true).withRouter(RoundRobinPool(1)))
|
||||
watch(ref)
|
||||
expectTerminated(ref)
|
||||
}
|
||||
|
||||
"creating a router" in EventFilter[ConfigurationException](occurrences = 1).intercept {
|
||||
val top = system.actorOf(creator(testActor))
|
||||
top ! creator(testActor)
|
||||
val middle = expectMsgType[ActorRef]
|
||||
middle ! creator(testActor, fail = true).withRouter(RoundRobinPool(1))
|
||||
expectMsgPF(hint = "ConfigurationException") {
|
||||
case (top, middle, ex: ConfigurationException) ⇒
|
||||
ex.getCause should ===(failure)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -621,7 +621,7 @@ private[akka] class ActorCell(
|
|||
|
||||
// future extension point
|
||||
protected def handleSupervise(child: ActorRef, async: Boolean): Unit = child match {
|
||||
case r: RepointableActorRef if async ⇒ r.point()
|
||||
case r: RepointableActorRef if async ⇒ r.point(catchFailures = true)
|
||||
case _ ⇒
|
||||
}
|
||||
|
||||
|
|
@ -649,4 +649,3 @@ private[akka] class ActorCell(
|
|||
|
||||
protected final def clazz(o: AnyRef): Class[_] = if (o eq null) this.getClass else o.getClass
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -6,15 +6,14 @@ package akka.actor
|
|||
|
||||
import java.util.{ LinkedList ⇒ JLinkedList }
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable
|
||||
|
||||
import akka.actor.dungeon.ChildrenContainer
|
||||
import akka.event.Logging.Warning
|
||||
import akka.util.Unsafe
|
||||
import akka.dispatch._
|
||||
import akka.dispatch.sysmsg._
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
/**
|
||||
* This actor ref starts out with some dummy cell (by default just enqueuing
|
||||
|
|
@ -76,7 +75,7 @@ private[akka] class RepointableActorRef(
|
|||
swapCell(new UnstartedCell(system, this, props, supervisor))
|
||||
swapLookup(underlying)
|
||||
supervisor.sendSystemMessage(Supervise(this, async))
|
||||
if (!async) point()
|
||||
if (!async) point(false)
|
||||
this
|
||||
case other ⇒ throw new IllegalStateException("initialize called more than once!")
|
||||
}
|
||||
|
|
@ -87,9 +86,16 @@ private[akka] class RepointableActorRef(
|
|||
* modification of the `underlying` field, though it is safe to send messages
|
||||
* at any time.
|
||||
*/
|
||||
def point(): this.type =
|
||||
def point(catchFailures: Boolean): this.type =
|
||||
underlying match {
|
||||
case u: UnstartedCell ⇒
|
||||
val cell =
|
||||
try newCell(u)
|
||||
catch {
|
||||
case NonFatal(ex) if catchFailures ⇒
|
||||
val safeDispatcher = system.dispatchers.defaultGlobalDispatcher
|
||||
new ActorCell(system, this, props, safeDispatcher, supervisor).initWithFailure(ex)
|
||||
}
|
||||
/*
|
||||
* The problem here was that if the real actor (which will start running
|
||||
* at cell.start()) creates children in its constructor, then this may
|
||||
|
|
@ -97,7 +103,6 @@ private[akka] class RepointableActorRef(
|
|||
* children cannot be looked up immediately, e.g. if they shall become
|
||||
* routees.
|
||||
*/
|
||||
val cell = newCell(u)
|
||||
swapLookup(cell)
|
||||
cell.start()
|
||||
u.replaceWith(cell)
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ import scala.util.control.Exception.Catcher
|
|||
import akka.dispatch.MailboxType
|
||||
import akka.dispatch.ProducesMessageQueue
|
||||
import akka.serialization.SerializerWithStringManifest
|
||||
import akka.dispatch.UnboundedMailbox
|
||||
|
||||
private[akka] trait Dispatch { this: ActorCell ⇒
|
||||
|
||||
|
|
@ -80,6 +81,16 @@ private[akka] trait Dispatch { this: ActorCell ⇒
|
|||
this
|
||||
}
|
||||
|
||||
final def initWithFailure(failure: Throwable): this.type = {
|
||||
val mbox = dispatcher.createMailbox(this, new UnboundedMailbox)
|
||||
swapMailbox(mbox)
|
||||
mailbox.setActor(this)
|
||||
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||
val createMessage = Create(Some(ActorInitializationException(self, "failure while creating ActorCell", failure)))
|
||||
mailbox.systemEnqueue(self, createMessage)
|
||||
this
|
||||
}
|
||||
|
||||
/**
|
||||
* Start this cell, i.e. attach it to the dispatcher.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -114,7 +114,7 @@ abstract class ClusterShardingGetStatsSpec extends MultiNodeSpec(ClusterSharding
|
|||
// make sure all nodes are up
|
||||
within(10.seconds) {
|
||||
awaitAssert {
|
||||
Cluster(system).state.members.count(_.status == MemberStatus.Up) should === (4)
|
||||
Cluster(system).state.members.count(_.status == MemberStatus.Up) should ===(4)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -432,7 +432,7 @@ trait TestKitBase {
|
|||
private def expectMsgClass_internal[C](max: FiniteDuration, c: Class[C]): C = {
|
||||
val o = receiveOne(max)
|
||||
assert(o ne null, s"timeout ($max) during expectMsgClass waiting for $c")
|
||||
assert(BoxedType(c) isInstance o, s"expected $c, found ${o.getClass}")
|
||||
assert(BoxedType(c) isInstance o, s"expected $c, found ${o.getClass} ($o)")
|
||||
o.asInstanceOf[C]
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -724,7 +724,11 @@ object MiMa extends AutoPlugin {
|
|||
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.FlowModule"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.FlowModule.subModules"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.FlowModule.label"),
|
||||
ProblemFilters.exclude[FinalClassProblem]("akka.stream.impl.fusing.GraphModule")
|
||||
ProblemFilters.exclude[FinalClassProblem]("akka.stream.impl.fusing.GraphModule"),
|
||||
|
||||
// #15947 catch mailbox creation failures
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.RepointableActorRef.point"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.Dispatch.initWithFailure")
|
||||
)
|
||||
)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue