diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index 41e76dcce7..2bc2ed1540 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -5,7 +5,6 @@ package akka.actor import language.postfixOps - import org.scalatest.BeforeAndAfterEach import scala.concurrent.duration._ import akka.{ Die, Ping } @@ -14,6 +13,12 @@ import akka.testkit._ import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.Await import akka.pattern.ask +import com.typesafe.config.ConfigFactory +import akka.dispatch.MailboxType +import akka.dispatch.MessageQueue +import com.typesafe.config.Config +import akka.ConfigurationException +import akka.routing.RoundRobinPool object SupervisorSpec { val Timeout = 5.seconds @@ -64,10 +69,40 @@ object SupervisorSpec { case Status.Failure(_) ⇒ /*Ignore*/ } } + + class Creator(target: ActorRef) extends Actor { + override val supervisorStrategy = OneForOneStrategy() { + case ex ⇒ + target ! ((self, sender(), ex)) + SupervisorStrategy.Stop + } + def receive = { + case p: Props ⇒ sender() ! context.actorOf(p) + } + } + + def creator(target: ActorRef, fail: Boolean = false) = { + val p = Props(new Creator(target)) + if (fail) p.withMailbox("error-mailbox") else p + } + + val failure = new AssertionError("deliberate test failure") + + class Mailbox(settings: ActorSystem.Settings, config: Config) extends MailboxType { + override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = + throw failure + } + + val config = ConfigFactory.parseString(""" +akka.actor.serialize-messages = off +error-mailbox { + mailbox-type = "akka.actor.SupervisorSpec$Mailbox" +} +""") } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class SupervisorSpec extends AkkaSpec("akka.actor.serialize-messages = off") with BeforeAndAfterEach with ImplicitSender with DefaultTimeout { +class SupervisorSpec extends AkkaSpec(SupervisorSpec.config) with BeforeAndAfterEach with ImplicitSender with DefaultTimeout { import SupervisorSpec._ @@ -423,5 +458,43 @@ class SupervisorSpec extends AkkaSpec("akka.actor.serialize-messages = off") wit parent ! "testchild" expectMsg("child green") } + + "log pre-creation check failures" when { + + "creating a top-level actor" in EventFilter[ActorInitializationException](occurrences = 1).intercept { + val ref = system.actorOf(creator(testActor, fail = true)) + watch(ref) + expectTerminated(ref) + } + + "creating a normal child actor" in EventFilter[ConfigurationException](occurrences = 1).intercept { + val top = system.actorOf(creator(testActor)) + top ! creator(testActor) + val middle = expectMsgType[ActorRef] + middle ! creator(testActor, fail = true) + expectMsgPF(hint = "ConfigurationException") { + case (top, middle, ex: ConfigurationException) ⇒ + ex.getCause should ===(failure) + } + } + + "creating a top-level router" in EventFilter[ActorInitializationException](occurrences = 1).intercept { + val ref = system.actorOf(creator(testActor, fail = true).withRouter(RoundRobinPool(1))) + watch(ref) + expectTerminated(ref) + } + + "creating a router" in EventFilter[ConfigurationException](occurrences = 1).intercept { + val top = system.actorOf(creator(testActor)) + top ! creator(testActor) + val middle = expectMsgType[ActorRef] + middle ! creator(testActor, fail = true).withRouter(RoundRobinPool(1)) + expectMsgPF(hint = "ConfigurationException") { + case (top, middle, ex: ConfigurationException) ⇒ + ex.getCause should ===(failure) + } + } + + } } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 4165f388c0..e5753a4c08 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -621,7 +621,7 @@ private[akka] class ActorCell( // future extension point protected def handleSupervise(child: ActorRef, async: Boolean): Unit = child match { - case r: RepointableActorRef if async ⇒ r.point() + case r: RepointableActorRef if async ⇒ r.point(catchFailures = true) case _ ⇒ } @@ -649,4 +649,3 @@ private[akka] class ActorCell( protected final def clazz(o: AnyRef): Class[_] = if (o eq null) this.getClass else o.getClass } - diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 311a1ab721..660dea44e3 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -6,15 +6,14 @@ package akka.actor import java.util.{ LinkedList ⇒ JLinkedList } import java.util.concurrent.locks.ReentrantLock - import scala.annotation.tailrec import scala.collection.immutable - import akka.actor.dungeon.ChildrenContainer import akka.event.Logging.Warning import akka.util.Unsafe import akka.dispatch._ import akka.dispatch.sysmsg._ +import scala.util.control.NonFatal /** * This actor ref starts out with some dummy cell (by default just enqueuing @@ -76,7 +75,7 @@ private[akka] class RepointableActorRef( swapCell(new UnstartedCell(system, this, props, supervisor)) swapLookup(underlying) supervisor.sendSystemMessage(Supervise(this, async)) - if (!async) point() + if (!async) point(false) this case other ⇒ throw new IllegalStateException("initialize called more than once!") } @@ -87,9 +86,16 @@ private[akka] class RepointableActorRef( * modification of the `underlying` field, though it is safe to send messages * at any time. */ - def point(): this.type = + def point(catchFailures: Boolean): this.type = underlying match { case u: UnstartedCell ⇒ + val cell = + try newCell(u) + catch { + case NonFatal(ex) if catchFailures ⇒ + val safeDispatcher = system.dispatchers.defaultGlobalDispatcher + new ActorCell(system, this, props, safeDispatcher, supervisor).initWithFailure(ex) + } /* * The problem here was that if the real actor (which will start running * at cell.start()) creates children in its constructor, then this may @@ -97,7 +103,6 @@ private[akka] class RepointableActorRef( * children cannot be looked up immediately, e.g. if they shall become * routees. */ - val cell = newCell(u) swapLookup(cell) cell.start() u.replaceWith(cell) diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala index d5a4e93844..0906266843 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala @@ -16,6 +16,7 @@ import scala.util.control.Exception.Catcher import akka.dispatch.MailboxType import akka.dispatch.ProducesMessageQueue import akka.serialization.SerializerWithStringManifest +import akka.dispatch.UnboundedMailbox private[akka] trait Dispatch { this: ActorCell ⇒ @@ -80,6 +81,16 @@ private[akka] trait Dispatch { this: ActorCell ⇒ this } + final def initWithFailure(failure: Throwable): this.type = { + val mbox = dispatcher.createMailbox(this, new UnboundedMailbox) + swapMailbox(mbox) + mailbox.setActor(this) + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + val createMessage = Create(Some(ActorInitializationException(self, "failure while creating ActorCell", failure))) + mailbox.systemEnqueue(self, createMessage) + this + } + /** * Start this cell, i.e. attach it to the dispatcher. */ diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala index 036f10b41a..149577b3e0 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala @@ -114,7 +114,7 @@ abstract class ClusterShardingGetStatsSpec extends MultiNodeSpec(ClusterSharding // make sure all nodes are up within(10.seconds) { awaitAssert { - Cluster(system).state.members.count(_.status == MemberStatus.Up) should === (4) + Cluster(system).state.members.count(_.status == MemberStatus.Up) should ===(4) } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index a6fcc2fc39..c92392427e 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -432,7 +432,7 @@ trait TestKitBase { private def expectMsgClass_internal[C](max: FiniteDuration, c: Class[C]): C = { val o = receiveOne(max) assert(o ne null, s"timeout ($max) during expectMsgClass waiting for $c") - assert(BoxedType(c) isInstance o, s"expected $c, found ${o.getClass}") + assert(BoxedType(c) isInstance o, s"expected $c, found ${o.getClass} ($o)") o.asInstanceOf[C] } diff --git a/project/MiMa.scala b/project/MiMa.scala index 12db64fd38..a91720c308 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -724,7 +724,11 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.FlowModule"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.FlowModule.subModules"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.FlowModule.label"), - ProblemFilters.exclude[FinalClassProblem]("akka.stream.impl.fusing.GraphModule") + ProblemFilters.exclude[FinalClassProblem]("akka.stream.impl.fusing.GraphModule"), + + // #15947 catch mailbox creation failures + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.RepointableActorRef.point"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.Dispatch.initWithFailure") ) ) }