diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 3056dc9e95..a003d25757 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -227,7 +227,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { contextStackMustBeEmpty } - filterException[java.lang.IllegalStateException] { + EventFilter[ActorInitializationException](occurrences = 1) intercept { (intercept[java.lang.IllegalStateException] { wrap(result ⇒ actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept({ throw new IllegalStateException("Ur state be b0rked"); new InnerActor })(result))))))) @@ -257,14 +257,14 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { val in = new ObjectInputStream(new ByteArrayInputStream(bytes)) val readA = in.readObject - a.isInstanceOf[LocalActorRef] must be === true - readA.isInstanceOf[LocalActorRef] must be === true + a.isInstanceOf[ActorRefWithCell] must be === true + readA.isInstanceOf[ActorRefWithCell] must be === true (readA eq a) must be === true } val ser = new JavaSerializer(esys) val readA = ser.fromBinary(bytes, None) - readA.isInstanceOf[LocalActorRef] must be === true + readA.isInstanceOf[ActorRefWithCell] must be === true (readA eq a) must be === true } @@ -369,13 +369,13 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { val timeout = Timeout(20000) val ref = system.actorOf(Props(new Actor { def receive = { - case 5 ⇒ sender.tell("five") - case null ⇒ sender.tell("null") + case 5 ⇒ sender.tell("five") + case 0 ⇒ sender.tell("null") } })) val ffive = (ref.ask(5)(timeout)).mapTo[String] - val fnull = (ref.ask(null)(timeout)).mapTo[String] + val fnull = (ref.ask(0)(timeout)).mapTo[String] ref ! PoisonPill Await.result(ffive, timeout.duration) must be("five") diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index 7ae79fea34..1a2d64bb41 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -10,6 +10,9 @@ import akka.dispatch.Await import akka.util.duration._ import scala.collection.JavaConverters import java.util.concurrent.{ TimeUnit, RejectedExecutionException, CountDownLatch, ConcurrentLinkedQueue } +import akka.pattern.ask +import akka.util.Timeout +import akka.dispatch.Future class JavaExtensionSpec extends JavaExtension with JUnitSuite @@ -21,8 +24,46 @@ object TestExtension extends ExtensionId[TestExtension] with ExtensionIdProvider // Dont't place inside ActorSystemSpec object, since it will not be garbage collected and reference to system remains class TestExtension(val system: ExtendedActorSystem) extends Extension +object ActorSystemSpec { + + class Waves extends Actor { + var master: ActorRef = _ + var terminaters = Set[ActorRef]() + + def receive = { + case n: Int ⇒ + master = sender + terminaters = Set() ++ (for (i ← 1 to n) yield { + val man = context.watch(context.system.actorOf(Props[Terminater])) + man ! "run" + man + }) + case Terminated(child) if terminaters contains child ⇒ + terminaters -= child + if (terminaters.isEmpty) { + master ! "done" + context stop self + } + } + + override def preRestart(cause: Throwable, msg: Option[Any]) { + if (master ne null) { + master ! "failed with " + cause + " while processing " + msg + } + context stop self + } + } + + class Terminater extends Actor { + def receive = { + case "run" ⇒ context.stop(self) + } + } + +} + @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExtension$"]""") { +class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExtension$"]""") with ImplicitSender { "An ActorSystem" must { @@ -112,6 +153,35 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt }.getMessage must be("Must be called prior to system shutdown.") } + "reliably create waves of actors" in { + import system.dispatcher + implicit val timeout = Timeout(30 seconds) + val waves = for (i ← 1 to 3) yield system.actorOf(Props[ActorSystemSpec.Waves]) ? 50000 + Await.result(Future.sequence(waves), timeout.duration + 5.seconds) must be === Seq("done", "done", "done") + } + + "reliable deny creation of actors while shutting down" in { + val system = ActorSystem() + system.scheduler.scheduleOnce(200 millis) { system.shutdown() } + var failing = false + var created = Vector.empty[ActorRef] + while (!system.isTerminated && system.uptime < 5) { + try { + val t = system.actorOf(Props[ActorSystemSpec.Terminater]) + failing must not be true // because once failing => always failing (it’s due to shutdown) + created :+= t + } catch { + case _: IllegalStateException ⇒ failing = true + } + } + if (system.uptime >= 5) { + println(created.last) + println(system.asInstanceOf[ExtendedActorSystem].printTree) + system.uptime must be < 5L + } + created filter (ref ⇒ !ref.isTerminated && !ref.asInstanceOf[ActorRefWithCell].underlying.isInstanceOf[UnstartedCell]) must be(Seq()) + } + } -} \ No newline at end of file +} diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 97eec5be01..8a21f5f070 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -143,6 +143,26 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout result must be(Seq(1, 2, 3)) } } + + "be able to watch a child with the same name after the old died" in { + val parent = system.actorOf(Props(new Actor { + def receive = { + case "NKOTB" ⇒ + val currentKid = context.watch(context.actorOf(Props(ctx ⇒ { case "NKOTB" ⇒ ctx stop ctx.self }), "kid")) + currentKid forward "NKOTB" + context become { + case Terminated(`currentKid`) ⇒ + testActor ! "GREEN" + context unbecome + } + } + })) + + parent ! "NKOTB" + expectMsg("GREEN") + parent ! "NKOTB" + expectMsg("GREEN") + } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala index df47c801bb..76d8df1e92 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala @@ -140,13 +140,13 @@ class FSMTimingSpec extends AkkaSpec with ImplicitSender { object FSMTimingSpec { def suspend(actorRef: ActorRef): Unit = actorRef match { - case l: LocalActorRef ⇒ l.suspend() - case _ ⇒ + case l: ActorRefWithCell ⇒ l.suspend() + case _ ⇒ } def resume(actorRef: ActorRef): Unit = actorRef match { - case l: LocalActorRef ⇒ l.resume() - case _ ⇒ + case l: ActorRefWithCell ⇒ l.resume() + case _ ⇒ } trait State diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 4d83c85b82..da789d9dce 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -3,24 +3,23 @@ */ package akka.actor.dispatch -import org.scalatest.Assertions._ -import akka.testkit._ -import akka.dispatch._ -import akka.util.Timeout -import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch, TimeUnit } -import akka.util.Switch import java.rmi.RemoteException -import org.junit.{ After, Test } -import akka.actor._ -import util.control.NoStackTrace -import akka.actor.ActorSystem -import akka.util.duration._ -import akka.event.Logging.Error +import java.util.concurrent.{ TimeUnit, CountDownLatch, ConcurrentHashMap } +import java.util.concurrent.atomic.{ AtomicLong, AtomicInteger } + +import org.junit.runner.RunWith +import org.scalatest.Assertions.{ fail, assert } +import org.scalatest.junit.JUnitRunner + import com.typesafe.config.Config -import akka.util.Duration + +import akka.actor._ +import akka.dispatch._ +import akka.event.Logging.Error import akka.pattern.ask +import akka.testkit._ +import akka.util.{ Timeout, Switch, Duration } +import akka.util.duration._ object ActorModelSpec { @@ -201,7 +200,7 @@ object ActorModelSpec { msgsReceived: Long = statsFor(actorRef, dispatcher).msgsReceived.get(), msgsProcessed: Long = statsFor(actorRef, dispatcher).msgsProcessed.get(), restarts: Long = statsFor(actorRef, dispatcher).restarts.get())(implicit system: ActorSystem) { - val stats = statsFor(actorRef, Option(dispatcher).getOrElse(actorRef.asInstanceOf[LocalActorRef].underlying.dispatcher)) + val stats = statsFor(actorRef, Option(dispatcher).getOrElse(actorRef.asInstanceOf[ActorRefWithCell].underlying.asInstanceOf[ActorCell].dispatcher)) val deadline = System.currentTimeMillis + 1000 try { await(deadline)(stats.suspensions.get() == suspensions) @@ -241,6 +240,13 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa def newTestActor(dispatcher: String) = system.actorOf(Props[DispatcherActor].withDispatcher(dispatcher)) + def awaitStarted(ref: ActorRef): Unit = { + awaitCond(ref match { + case r: RepointableRef ⇒ r.isStarted + case _ ⇒ true + }, 1 second, 10 millis) + } + protected def interceptedDispatcher(): MessageDispatcherInterceptor protected def dispatcherType: String @@ -280,6 +286,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa implicit val dispatcher = interceptedDispatcher() val start, oneAtATime = new CountDownLatch(1) val a = newTestActor(dispatcher.id) + awaitStarted(a) a ! CountDown(start) assertCountDown(start, 3.seconds.dilated.toMillis, "Should process first message within 3 seconds") @@ -328,7 +335,8 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa "not process messages for a suspended actor" in { implicit val dispatcher = interceptedDispatcher() - val a = newTestActor(dispatcher.id).asInstanceOf[LocalActorRef] + val a = newTestActor(dispatcher.id).asInstanceOf[InternalActorRef] + awaitStarted(a) val done = new CountDownLatch(1) a.suspend a ! CountDown(done) @@ -436,6 +444,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa "not double-deregister" in { implicit val dispatcher = interceptedDispatcher() + for (i ← 1 to 1000) system.actorOf(Props.empty) val a = newTestActor(dispatcher.id) a ! DoubleStop awaitCond(statsFor(a, dispatcher).registers.get == 1) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala index 4060587b73..1a5c7e8661 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala @@ -1,8 +1,12 @@ package akka.actor.dispatch import java.util.concurrent.{ TimeUnit, CountDownLatch } -import akka.dispatch.{ Mailbox, Dispatchers } -import akka.actor.{ LocalActorRef, IllegalActorStateException, Actor, Props } + +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner + +import akka.actor.{ Props, ActorRefWithCell, ActorCell, Actor } +import akka.dispatch.Mailbox import akka.testkit.AkkaSpec object BalancingDispatcherSpec { @@ -51,8 +55,8 @@ class BalancingDispatcherSpec extends AkkaSpec(BalancingDispatcherSpec.config) { "have fast actor stealing work from slow actor" in { val finishedCounter = new CountDownLatch(110) - val slow = system.actorOf(Props(new DelayableActor(50, finishedCounter)).withDispatcher(delayableActorDispatcher)).asInstanceOf[LocalActorRef] - val fast = system.actorOf(Props(new DelayableActor(10, finishedCounter)).withDispatcher(delayableActorDispatcher)).asInstanceOf[LocalActorRef] + val slow = system.actorOf(Props(new DelayableActor(50, finishedCounter)).withDispatcher(delayableActorDispatcher)).asInstanceOf[ActorRefWithCell] + val fast = system.actorOf(Props(new DelayableActor(10, finishedCounter)).withDispatcher(delayableActorDispatcher)).asInstanceOf[ActorRefWithCell] var sentToFast = 0 @@ -76,11 +80,11 @@ class BalancingDispatcherSpec extends AkkaSpec(BalancingDispatcherSpec.config) { } finishedCounter.await(5, TimeUnit.SECONDS) - fast.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false) - slow.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false) - fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > sentToFast - fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > - (slow.underlying.actor.asInstanceOf[DelayableActor].invocationCount) + fast.underlying.asInstanceOf[ActorCell].mailbox.asInstanceOf[Mailbox].hasMessages must be(false) + slow.underlying.asInstanceOf[ActorCell].mailbox.asInstanceOf[Mailbox].hasMessages must be(false) + fast.underlying.asInstanceOf[ActorCell].actor.asInstanceOf[DelayableActor].invocationCount must be > sentToFast + fast.underlying.asInstanceOf[ActorCell].actor.asInstanceOf[DelayableActor].invocationCount must be > + (slow.underlying.asInstanceOf[ActorCell].actor.asInstanceOf[DelayableActor].invocationCount) system.stop(slow) system.stop(fast) } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 4f2d61de65..ba025ffe3c 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -1,13 +1,17 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ package akka.dispatch -import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } -import java.util.concurrent.{ TimeUnit, BlockingQueue } -import java.util.concurrent.ConcurrentLinkedQueue -import akka.util._ -import akka.util.duration._ -import akka.testkit.AkkaSpec +import java.util.concurrent.{ ConcurrentLinkedQueue, BlockingQueue } + +import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll } + import com.typesafe.config.Config -import akka.actor._ + +import akka.actor.{ RepointableRef, Props, DeadLetter, ActorSystem, ActorRefWithCell, ActorRef, ActorCell } +import akka.testkit.AkkaSpec +import akka.util.duration.intToDurationInt @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach { @@ -75,7 +79,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn result } - def createMessageInvocation(msg: Any): Envelope = Envelope(msg, system.deadLetters)(system) + def createMessageInvocation(msg: Any): Envelope = Envelope(msg, system.deadLetters, system) def ensureInitialMailboxState(config: MailboxType, q: MessageQueue) { q must not be null @@ -136,8 +140,8 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn class DefaultMailboxSpec extends MailboxSpec { lazy val name = "The default mailbox implementation" def factory = { - case u: UnboundedMailbox ⇒ u.create(None) - case b: BoundedMailbox ⇒ b.create(None) + case u: UnboundedMailbox ⇒ u.create(None, None) + case b: BoundedMailbox ⇒ b.create(None, None) } } @@ -145,8 +149,8 @@ class PriorityMailboxSpec extends MailboxSpec { val comparator = PriorityGenerator(_.##) lazy val name = "The priority mailbox implementation" def factory = { - case UnboundedMailbox() ⇒ new UnboundedPriorityMailbox(comparator).create(None) - case BoundedMailbox(capacity, pushTimeOut) ⇒ new BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(None) + case UnboundedMailbox() ⇒ new UnboundedPriorityMailbox(comparator).create(None, None) + case BoundedMailbox(capacity, pushTimeOut) ⇒ new BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(None, None) } } @@ -158,13 +162,13 @@ object CustomMailboxSpec { """ class MyMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType { - override def create(owner: Option[ActorContext]) = owner match { + override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = owner match { case Some(o) ⇒ new MyMailbox(o) case None ⇒ throw new Exception("no mailbox owner given") } } - class MyMailbox(owner: ActorContext) extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics { + class MyMailbox(owner: ActorRef) extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics { final val queue = new ConcurrentLinkedQueue[Envelope]() } } @@ -174,7 +178,11 @@ class CustomMailboxSpec extends AkkaSpec(CustomMailboxSpec.config) { "Dispatcher configuration" must { "support custom mailboxType" in { val actor = system.actorOf(Props.empty.withDispatcher("my-dispatcher")) - val queue = actor.asInstanceOf[LocalActorRef].underlying.mailbox.messageQueue + awaitCond(actor match { + case r: RepointableRef ⇒ r.isStarted + case _ ⇒ true + }, 1 second, 10 millis) + val queue = actor.asInstanceOf[ActorRefWithCell].underlying.asInstanceOf[ActorCell].mailbox.messageQueue queue.getClass must be(classOf[CustomMailboxSpec.MyMailbox]) } } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala index a9855fef7d..11f8760320 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala @@ -1,12 +1,14 @@ package akka.dispatch -import akka.actor.{ Props, LocalActorRef, Actor } -import akka.testkit.AkkaSpec -import akka.pattern.ask -import akka.util.duration._ -import akka.testkit.DefaultTimeout +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner + import com.typesafe.config.Config -import akka.actor.ActorSystem + +import akka.actor.{ Props, InternalActorRef, ActorSystem, Actor } +import akka.pattern.ask +import akka.testkit.{ DefaultTimeout, AkkaSpec } +import akka.util.duration.intToDurationInt object PriorityDispatcherSpec { val config = """ @@ -54,7 +56,7 @@ class PriorityDispatcherSpec extends AkkaSpec(PriorityDispatcherSpec.config) wit case i: Int ⇒ acc = i :: acc case 'Result ⇒ sender.tell(acc) } - }).withDispatcher(dispatcherKey)).asInstanceOf[LocalActorRef] + }).withDispatcher(dispatcherKey)).asInstanceOf[InternalActorRef] actor.suspend //Make sure the actor isn't treating any messages, let it buffer the incoming messages diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala index 5bedc8fc33..77ac5daf49 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala @@ -4,15 +4,14 @@ package akka.routing import java.util.concurrent.atomic.AtomicInteger - import org.junit.runner.RunWith - -import akka.actor.{ Props, LocalActorRef, Deploy, Actor, ActorRef } +import akka.actor.{ Props, Deploy, Actor, ActorRef } import akka.ConfigurationException import akka.dispatch.Await import akka.pattern.{ ask, gracefulStop } import akka.testkit.{ TestLatch, ImplicitSender, DefaultTimeout, AkkaSpec } import akka.util.duration.intToDurationInt +import akka.actor.UnstartedCell object ConfiguredLocalRoutingSpec { val config = """ @@ -47,6 +46,14 @@ object ConfiguredLocalRoutingSpec { @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.config) with DefaultTimeout with ImplicitSender { + def routerConfig(ref: ActorRef): RouterConfig = ref match { + case r: RoutedActorRef ⇒ + r.underlying match { + case c: RoutedActorCell ⇒ c.routerConfig + case _: UnstartedCell ⇒ awaitCond(r.isStarted, 1 second, 10 millis); routerConfig(ref) + } + } + "RouterConfig" must { "be picked up from Props" in { @@ -55,7 +62,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con case "get" ⇒ sender ! context.props } }).withRouter(RoundRobinRouter(12)), "someOther") - actor.asInstanceOf[LocalActorRef].underlying.props.routerConfig must be === RoundRobinRouter(12) + routerConfig(actor) must be === RoundRobinRouter(12) Await.result(gracefulStop(actor, 3 seconds), 3 seconds) } @@ -65,7 +72,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con case "get" ⇒ sender ! context.props } }).withRouter(RoundRobinRouter(12)), "config") - actor.asInstanceOf[LocalActorRef].underlying.props.routerConfig must be === RandomRouter(4) + routerConfig(actor) must be === RandomRouter(4) Await.result(gracefulStop(actor, 3 seconds), 3 seconds) } @@ -75,7 +82,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con case "get" ⇒ sender ! context.props } }).withRouter(FromConfig).withDeploy(Deploy(routerConfig = RoundRobinRouter(12))), "someOther") - actor.asInstanceOf[LocalActorRef].underlying.props.routerConfig must be === RoundRobinRouter(12) + routerConfig(actor) must be === RoundRobinRouter(12) Await.result(gracefulStop(actor, 3 seconds), 3 seconds) } @@ -85,7 +92,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con case "get" ⇒ sender ! context.props } }).withRouter(FromConfig).withDeploy(Deploy(routerConfig = RoundRobinRouter(12))), "config") - actor.asInstanceOf[LocalActorRef].underlying.props.routerConfig must be === RandomRouter(4) + routerConfig(actor) must be === RandomRouter(4) Await.result(gracefulStop(actor, 3 seconds), 3 seconds) } diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 35631924cf..a202778fe5 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -12,10 +12,11 @@ import akka.dispatch.Await import akka.util.Duration import akka.ConfigurationException import com.typesafe.config.ConfigFactory -import akka.pattern.ask +import akka.pattern.{ ask, pipe } import java.util.concurrent.ConcurrentHashMap import com.typesafe.config.Config import akka.dispatch.Dispatchers +import akka.util.Timeout object RoutingSpec { @@ -171,6 +172,18 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with expectMsg("restarted") } + "must start in-line for context.actorOf()" in { + system.actorOf(Props(new Actor { + def receive = { + case "start" ⇒ + context.actorOf(Props(new Actor { + def receive = { case x ⇒ sender ! x } + }).withRouter(RoundRobinRouter(2))) ? "hello" pipeTo sender + } + })) ! "start" + expectMsg("hello") + } + } "no router" must { @@ -528,7 +541,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with } } "support custom router" in { - val myrouter = system.actorOf(Props().withRouter(FromConfig), "myrouter") + val myrouter = system.actorOf(Props.empty.withRouter(FromConfig), "myrouter") myrouter.isTerminated must be(false) } } @@ -540,7 +553,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with } "count votes as intended - not as in Florida" in { - val routedActor = system.actorOf(Props().withRouter(VoteCountRouter())) + val routedActor = system.actorOf(Props.empty.withRouter(VoteCountRouter())) routedActor ! DemocratVote routedActor ! DemocratVote routedActor ! RepublicanVote diff --git a/akka-actor/src/main/java/akka/actor/AbstractActorCell.java b/akka-actor/src/main/java/akka/actor/AbstractActorCell.java index d6005f463c..95fb7368bc 100644 --- a/akka-actor/src/main/java/akka/actor/AbstractActorCell.java +++ b/akka-actor/src/main/java/akka/actor/AbstractActorCell.java @@ -8,10 +8,14 @@ import akka.util.Unsafe; final class AbstractActorCell { final static long mailboxOffset; + final static long childrenOffset; + final static long nextNameOffset; static { try { mailboxOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("_mailboxDoNotCallMeDirectly")); + childrenOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("_childrenRefsDoNotCallMeDirectly")); + nextNameOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("_nextNameDoNotCallMeDirectly")); } catch(Throwable t){ throw new ExceptionInInitializerError(t); } diff --git a/akka-actor/src/main/java/akka/actor/AbstractActorRef.java b/akka-actor/src/main/java/akka/actor/AbstractActorRef.java new file mode 100644 index 0000000000..97ef09c501 --- /dev/null +++ b/akka-actor/src/main/java/akka/actor/AbstractActorRef.java @@ -0,0 +1,19 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor; + +import akka.util.Unsafe; + +final class AbstractActorRef { + final static long cellOffset; + + static { + try { + cellOffset = Unsafe.instance.objectFieldOffset(RepointableActorRef.class.getDeclaredField("_cellDoNotCallMeDirectly")); + } catch(Throwable t){ + throw new ExceptionInInitializerError(t); + } + } +} diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 8fc7df93e5..8b9476efe9 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -58,7 +58,7 @@ case object Kill extends Kill { /** * When Death Watch is used, the watcher will receive a Terminated(watched) message when watched is terminated. */ -case class Terminated(@BeanProperty actor: ActorRef)(@BeanProperty val existenceConfirmed: Boolean) +case class Terminated(@BeanProperty actor: ActorRef)(@BeanProperty val existenceConfirmed: Boolean) extends AutoReceivedMessage abstract class ReceiveTimeout extends PossiblyHarmful @@ -134,8 +134,7 @@ class ActorInitializationException private[akka] (actor: ActorRef, message: Stri * there might be more of them in the future, or not. */ class InvalidMessageException private[akka] (message: String, cause: Throwable = null) - extends AkkaException(message, cause) - with NoStackTrace { + extends AkkaException(message, cause) { def this(msg: String) = this(msg, null) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 72793513e2..e739ffc859 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -15,6 +15,7 @@ import akka.serialization.SerializationExtension import akka.event.Logging.LogEventException import collection.immutable.{ TreeSet, TreeMap } import akka.util.{ Unsafe, Duration, Helpers, NonFatal } +import java.util.concurrent.atomic.AtomicLong //TODO: everything here for current compatibility - could be limited more @@ -167,6 +168,78 @@ trait UntypedActorContext extends ActorContext { } +/** + * INTERNAL API + */ +private[akka] trait Cell { + /** + * The “self” reference which this Cell is attached to. + */ + def self: ActorRef + /** + * The system within which this Cell lives. + */ + def system: ActorSystem + /** + * The system internals where this Cell lives. + */ + def systemImpl: ActorSystemImpl + /** + * Recursively suspend this actor and all its children. + */ + def suspend(): Unit + /** + * Recursively resume this actor and all its children. + */ + def resume(): Unit + /** + * Restart this actor (will recursively restart or stop all children). + */ + def restart(cause: Throwable): Unit + /** + * Recursively terminate this actor and all its children. + */ + def stop(): Unit + /** + * Returns “true” if the actor is locally known to be terminated, “false” if + * alive or uncertain. + */ + def isTerminated: Boolean + /** + * The supervisor of this actor. + */ + def parent: InternalActorRef + /** + * All children of this actor, including only reserved-names. + */ + def childrenRefs: ActorCell.ChildrenContainer + /** + * Enqueue a message to be sent to the actor; may or may not actually + * schedule the actor to run, depending on which type of cell it is. + */ + def tell(message: Any, sender: ActorRef): Unit + /** + * Enqueue a message to be sent to the actor; may or may not actually + * schedule the actor to run, depending on which type of cell it is. + */ + def sendSystemMessage(msg: SystemMessage): Unit + /** + * Returns true if the actor is local, i.e. if it is actually scheduled + * on a Thread in the current JVM when run. + */ + def isLocal: Boolean + /** + * If the actor isLocal, returns whether messages are currently queued, + * “false” otherwise. + */ + def hasMessages: Boolean + /** + * If the actor isLocal, returns the number of messages currently queued, + * which may be a costly operation, 0 otherwise. + */ + def numberOfMessages: Int +} + /** * Everything in here is completely Akka PRIVATE. You will not find any * supported APIs in this place. This is not the API you were looking @@ -201,10 +274,18 @@ private[akka] object ActorCell { def children: Iterable[ActorRef] def stats: Iterable[ChildRestartStats] def shallDie(actor: ActorRef): ChildrenContainer + /** + * reserve that name or throw an exception + */ + def reserve(name: String): ChildrenContainer + /** + * cancel a reservation + */ + def unreserve(name: String): ChildrenContainer } trait EmptyChildrenContainer extends ChildrenContainer { - val emptyStats = TreeMap.empty[String, ChildRestartStats] + val emptyStats = TreeMap.empty[String, ChildStats] def add(child: ActorRef): ChildrenContainer = new NormalChildrenContainer(emptyStats.updated(child.path.name, ChildRestartStats(child))) def remove(child: ActorRef): ChildrenContainer = this @@ -213,6 +294,8 @@ private[akka] object ActorCell { def children: Iterable[ActorRef] = Nil def stats: Iterable[ChildRestartStats] = Nil def shallDie(actor: ActorRef): ChildrenContainer = this + def reserve(name: String): ChildrenContainer = new NormalChildrenContainer(emptyStats.updated(name, ChildNameReserved)) + def unreserve(name: String): ChildrenContainer = this override def toString = "no children" } @@ -228,6 +311,8 @@ private[akka] object ActorCell { */ object TerminatedChildrenContainer extends EmptyChildrenContainer { override def add(child: ActorRef): ChildrenContainer = this + override def reserve(name: String): ChildrenContainer = + throw new IllegalStateException("cannot reserve actor name '" + name + "': already terminated") } /** @@ -236,32 +321,46 @@ private[akka] object ActorCell { * calling context.stop(child) and processing the ChildTerminated() system * message). */ - class NormalChildrenContainer(c: TreeMap[String, ChildRestartStats]) extends ChildrenContainer { + class NormalChildrenContainer(c: TreeMap[String, ChildStats]) extends ChildrenContainer { - def add(child: ActorRef): ChildrenContainer = new NormalChildrenContainer(c.updated(child.path.name, ChildRestartStats(child))) + def add(child: ActorRef): ChildrenContainer = + new NormalChildrenContainer(c.updated(child.path.name, ChildRestartStats(child))) def remove(child: ActorRef): ChildrenContainer = NormalChildrenContainer(c - child.path.name) - def getByName(name: String): Option[ChildRestartStats] = c get name - - def getByRef(actor: ActorRef): Option[ChildRestartStats] = c get actor.path.name match { - case c @ Some(crs) if (crs.child == actor) ⇒ c - case _ ⇒ None + def getByName(name: String): Option[ChildRestartStats] = c.get(name) match { + case s @ Some(_: ChildRestartStats) ⇒ s.asInstanceOf[Option[ChildRestartStats]] + case _ ⇒ None } - def children: Iterable[ActorRef] = c.values.view.map(_.child) + def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match { + case c @ Some(crs: ChildRestartStats) if (crs.child == actor) ⇒ c.asInstanceOf[Option[ChildRestartStats]] + case _ ⇒ None + } - def stats: Iterable[ChildRestartStats] = c.values + def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) ⇒ child } + + def stats: Iterable[ChildRestartStats] = c.values.collect { case c: ChildRestartStats ⇒ c } def shallDie(actor: ActorRef): ChildrenContainer = TerminatingChildrenContainer(c, Set(actor), UserRequest) + def reserve(name: String): ChildrenContainer = + if (c contains name) + throw new InvalidActorNameException("actor name " + name + " is not unique!") + else new NormalChildrenContainer(c.updated(name, ChildNameReserved)) + + def unreserve(name: String): ChildrenContainer = c.get(name) match { + case Some(ChildNameReserved) ⇒ NormalChildrenContainer(c - name) + case _ ⇒ this + } + override def toString = if (c.size > 20) c.size + " children" else c.mkString("children:\n ", "\n ", "") } object NormalChildrenContainer { - def apply(c: TreeMap[String, ChildRestartStats]): ChildrenContainer = + def apply(c: TreeMap[String, ChildStats]): ChildrenContainer = if (c.isEmpty) EmptyChildrenContainer else new NormalChildrenContainer(c) } @@ -276,7 +375,7 @@ private[akka] object ActorCell { * type of container, depending on whether or not children are left and whether or not * the reason was “Terminating”. */ - case class TerminatingChildrenContainer(c: TreeMap[String, ChildRestartStats], toDie: Set[ActorRef], reason: SuspendReason) + case class TerminatingChildrenContainer(c: TreeMap[String, ChildStats], toDie: Set[ActorRef], reason: SuspendReason) extends ChildrenContainer { def add(child: ActorRef): ChildrenContainer = copy(c.updated(child.path.name, ChildRestartStats(child))) @@ -290,19 +389,35 @@ private[akka] object ActorCell { else copy(c - child.path.name, t) } - def getByName(name: String): Option[ChildRestartStats] = c get name - - def getByRef(actor: ActorRef): Option[ChildRestartStats] = c get actor.path.name match { - case c @ Some(crs) if (crs.child == actor) ⇒ c - case _ ⇒ None + def getByName(name: String): Option[ChildRestartStats] = c.get(name) match { + case s @ Some(_: ChildRestartStats) ⇒ s.asInstanceOf[Option[ChildRestartStats]] + case _ ⇒ None } - def children: Iterable[ActorRef] = c.values.view.map(_.child) + def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match { + case c @ Some(crs: ChildRestartStats) if (crs.child == actor) ⇒ c.asInstanceOf[Option[ChildRestartStats]] + case _ ⇒ None + } - def stats: Iterable[ChildRestartStats] = c.values + def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) ⇒ child } + + def stats: Iterable[ChildRestartStats] = c.values.collect { case c: ChildRestartStats ⇒ c } def shallDie(actor: ActorRef): ChildrenContainer = copy(toDie = toDie + actor) + def reserve(name: String): ChildrenContainer = reason match { + case Termination ⇒ throw new IllegalStateException("cannot reserve actor name '" + name + "': terminating") + case _ ⇒ + if (c contains name) + throw new InvalidActorNameException("actor name " + name + " is not unique!") + else copy(c = c.updated(name, ChildNameReserved)) + } + + def unreserve(name: String): ChildrenContainer = c.get(name) match { + case Some(ChildNameReserved) ⇒ copy(c = c - name) + case _ ⇒ this + } + override def toString = if (c.size > 20) c.size + " children" else c.mkString("children (" + toDie.size + " terminating):\n ", "\n ", "\n") + toDie @@ -316,10 +431,13 @@ private[akka] class ActorCell( val system: ActorSystemImpl, val self: InternalActorRef, val props: Props, - @volatile var parent: InternalActorRef) extends UntypedActorContext { - import AbstractActorCell.mailboxOffset + @volatile var parent: InternalActorRef) extends UntypedActorContext with Cell { + + import AbstractActorCell.{ mailboxOffset, childrenOffset, nextNameOffset } import ActorCell._ + final def isLocal = true + final def systemImpl = system protected final def guardian = self @@ -353,7 +471,46 @@ private[akka] class ActorCell( var receiveTimeoutData: (Long, Cancellable) = emptyReceiveTimeoutData @volatile - var childrenRefs: ChildrenContainer = EmptyChildrenContainer + private var _childrenRefsDoNotCallMeDirectly: ChildrenContainer = EmptyChildrenContainer + + def childrenRefs: ChildrenContainer = Unsafe.instance.getObjectVolatile(this, childrenOffset).asInstanceOf[ChildrenContainer] + + private def swapChildrenRefs(oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean = + Unsafe.instance.compareAndSwapObject(this, childrenOffset, oldChildren, newChildren) + + @tailrec private def reserveChild(name: String): Boolean = { + val c = childrenRefs + swapChildrenRefs(c, c.reserve(name)) || reserveChild(name) + } + + @tailrec private def unreserveChild(name: String): Boolean = { + val c = childrenRefs + swapChildrenRefs(c, c.unreserve(name)) || unreserveChild(name) + } + + @tailrec private def addChild(ref: ActorRef): Boolean = { + val c = childrenRefs + swapChildrenRefs(c, c.add(ref)) || addChild(ref) + } + + @tailrec private def shallDie(ref: ActorRef): Boolean = { + val c = childrenRefs + swapChildrenRefs(c, c.shallDie(ref)) || shallDie(ref) + } + + @tailrec private def removeChild(ref: ActorRef): ChildrenContainer = { + val c = childrenRefs + val n = c.remove(ref) + if (swapChildrenRefs(c, n)) n + else removeChild(ref) + } + + @tailrec private def setChildrenTerminationReason(reason: SuspendReason): Boolean = { + childrenRefs match { + case c: TerminatingChildrenContainer ⇒ swapChildrenRefs(c, c.copy(reason = reason)) || setChildrenTerminationReason(reason) + case _ ⇒ false + } + } private def isTerminating = childrenRefs match { case TerminatingChildrenContainer(_, _, Termination) ⇒ true @@ -365,7 +522,7 @@ private[akka] class ActorCell( case _ ⇒ true } - private def _actorOf(props: Props, name: String): ActorRef = { + private def _actorOf(props: Props, name: String, async: Boolean): ActorRef = { if (system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) { val ser = SerializationExtension(system) ser.serialize(props.creator) match { @@ -376,53 +533,74 @@ private[akka] class ActorCell( } } } - // in case we are currently terminating, swallow creation requests and return EmptyLocalActorRef - if (isTerminating) provider.actorFor(self, Seq(name)) + /* + * in case we are currently terminating, fail external attachChild requests + * (internal calls cannot happen anyway because we are suspended) + */ + if (isTerminating) throw new IllegalStateException("cannot create children while terminating or terminated") else { - val actor = provider.actorOf(systemImpl, props, self, self.path / name, false, None, true) - childrenRefs = childrenRefs.add(actor) + reserveChild(name) + // this name will either be unreserved or overwritten with a real child below + val actor = + try { + provider.actorOf(systemImpl, props, self, self.path / name, + systemService = false, deploy = None, lookupDeploy = true, async = async) + } catch { + case NonFatal(e) ⇒ + unreserveChild(name) + throw e + } + addChild(actor) actor } } - def actorOf(props: Props): ActorRef = _actorOf(props, randomName()) + def actorOf(props: Props): ActorRef = _actorOf(props, randomName(), async = false) - def actorOf(props: Props, name: String): ActorRef = { + def actorOf(props: Props, name: String): ActorRef = _actorOf(props, checkName(name), async = false) + + private def checkName(name: String): String = { import ActorPath.ElementRegex name match { case null ⇒ throw new InvalidActorNameException("actor name must not be null") case "" ⇒ throw new InvalidActorNameException("actor name must not be empty") - case ElementRegex() ⇒ // this is fine + case ElementRegex() ⇒ name case _ ⇒ throw new InvalidActorNameException("illegal actor name '" + name + "', must conform to " + ElementRegex) } - childrenRefs.getByName(name) match { - case None ⇒ _actorOf(props, name) - case _ ⇒ throw new InvalidActorNameException("actor name " + name + " is not unique!") - } } + private[akka] def attachChild(props: Props, name: String): ActorRef = + _actorOf(props, checkName(name), async = true) + + private[akka] def attachChild(props: Props): ActorRef = + _actorOf(props, randomName(), async = true) + final def stop(actor: ActorRef): Unit = { - if (childrenRefs.getByRef(actor).isDefined) childrenRefs = childrenRefs.shallDie(actor) + val started = actor match { + case r: RepointableRef ⇒ r.isStarted + case _ ⇒ true + } + if (childrenRefs.getByRef(actor).isDefined && started) shallDie(actor) actor.asInstanceOf[InternalActorRef].stop() } var currentMessage: Envelope = _ var actor: Actor = _ private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack - @volatile var _mailboxDoNotCallMeDirectly: Mailbox = _ //This must be volatile since it isn't protected by the mailbox status - var nextNameSequence: Long = 0 var watching: Set[ActorRef] = emptyActorRefSet var watchedBy: Set[ActorRef] = emptyActorRefSet - //Not thread safe, so should only be used inside the actor that inhabits this ActorCell + @volatile private var _nextNameDoNotCallMeDirectly = 0L final protected def randomName(): String = { - val n = nextNameSequence - nextNameSequence = n + 1 - Helpers.base64(n) + @tailrec def inc(): Long = { + val current = Unsafe.instance.getLongVolatile(this, nextNameOffset) + if (Unsafe.instance.compareAndSwapLong(this, nextNameOffset, current, current + 1)) current + else inc() + } + Helpers.base64(inc()) } - @inline - final val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher) + @volatile private var _mailboxDoNotCallMeDirectly: Mailbox = _ //This must be volatile since it isn't protected by the mailbox status /** * INTERNAL API @@ -442,6 +620,12 @@ private[akka] class ActorCell( else oldMailbox } + final def hasMessages: Boolean = mailbox.hasMessages + + final def numberOfMessages: Int = mailbox.numberOfMessages + + val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher) + /** * UntypedActorContext impl */ @@ -449,20 +633,22 @@ private[akka] class ActorCell( final def isTerminated: Boolean = mailbox.isClosed - final def start(): Unit = { + final def start(): this.type = { + /* * Create the mailbox and enqueue the Create() message to ensure that * this is processed before anything else. */ swapMailbox(dispatcher.createMailbox(this)) + mailbox.setActor(this) + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ mailbox.systemEnqueue(self, Create()) - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - parent.sendSystemMessage(akka.dispatch.Supervise(self)) - // This call is expected to start off the actor by scheduling its mailbox. dispatcher.attach(this) + + this } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ @@ -500,8 +686,10 @@ private[akka] class ActorCell( final def getChildren(): java.lang.Iterable[ActorRef] = scala.collection.JavaConverters.asJavaIterableConverter(children).asJava - final def tell(message: Any, sender: ActorRef): Unit = - dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender)(system)) + def tell(message: Any, sender: ActorRef): Unit = + dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system)) + + override def sendSystemMessage(message: SystemMessage): Unit = dispatcher.systemDispatch(this, message) final def sender: ActorRef = currentMessage match { case null ⇒ system.deadLetters @@ -564,7 +752,7 @@ private[akka] class ActorCell( } childrenRefs match { case ct: TerminatingChildrenContainer ⇒ - childrenRefs = ct.copy(reason = Recreation(cause)) + setChildrenTerminationReason(Recreation(cause)) dispatcher suspend this case _ ⇒ doRecreate(cause, failedActor) @@ -622,7 +810,7 @@ private[akka] class ActorCell( childrenRefs match { case ct: TerminatingChildrenContainer ⇒ - childrenRefs = ct.copy(reason = Termination) + setChildrenTerminationReason(Termination) // do not process normal messages while waiting for all children to terminate dispatcher suspend this if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopping")) @@ -631,7 +819,8 @@ private[akka] class ActorCell( } def supervise(child: ActorRef): Unit = if (!isTerminating) { - if (childrenRefs.getByRef(child).isEmpty) childrenRefs = childrenRefs.add(child) + if (childrenRefs.getByRef(child).isEmpty) addChild(child) + handleSupervise(child) if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) } @@ -646,6 +835,7 @@ private[akka] class ActorCell( case Terminate() ⇒ terminate() case Supervise(child) ⇒ supervise(child) case ChildTerminated(child) ⇒ handleChildTerminated(child) + case NoMessage ⇒ // only here to suppress warning } } catch { case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(e, "error while processing " + message) @@ -706,6 +896,7 @@ private[akka] class ActorCell( msg.message match { case Failed(cause) ⇒ handleFailure(sender, cause) + case t: Terminated ⇒ watching -= t.actor; receiveMessage(t) case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ self.stop() case SelectParent(m) ⇒ parent.tell(m, msg.sender) @@ -794,8 +985,7 @@ private[akka] class ActorCell( final def handleChildTerminated(child: ActorRef): Unit = try { childrenRefs match { case tc @ TerminatingChildrenContainer(_, _, reason) ⇒ - val n = tc.remove(child) - childrenRefs = n + val n = removeChild(child) actor.supervisorStrategy.handleChildTerminated(this, child, children) if (!n.isInstanceOf[TerminatingChildrenContainer]) reason match { case Recreation(cause) ⇒ doRecreate(cause, actor) // doRecreate since this is the continuation of "recreate" @@ -803,7 +993,7 @@ private[akka] class ActorCell( case _ ⇒ } case _ ⇒ - childrenRefs = childrenRefs.remove(child) + removeChild(child) actor.supervisorStrategy.handleChildTerminated(this, child, children) } } catch { @@ -816,6 +1006,11 @@ private[akka] class ActorCell( } } + protected def handleSupervise(child: ActorRef): Unit = child match { + case r: RepointableActorRef ⇒ r.activate() + case _ ⇒ + } + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ final def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause)) diff --git a/akka-actor/src/main/scala/akka/actor/ActorPath.scala b/akka-actor/src/main/scala/akka/actor/ActorPath.scala index aa93dbcc47..1112b90f31 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorPath.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorPath.scala @@ -192,7 +192,7 @@ final class ChildActorPath(val parent: ActorPath, val name: String) extends Acto // TODO RK investigate Phil’s hash from scala.collection.mutable.HashTable.improve override def hashCode: Int = { - import scala.util.MurmurHash._ + import akka.routing.MurmurHash._ @tailrec def rec(p: ActorPath, h: Int, c: Int, k: Int): Int = p match { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 0620a73a28..8d42714b00 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -163,10 +163,24 @@ private[akka] trait ActorRefScope { def isLocal: Boolean } +/** + * Refs which are statically known to be local inherit from this Scope + */ private[akka] trait LocalRef extends ActorRefScope { final def isLocal = true } +/** + * RepointableActorRef (and potentially others) may change their locality at + * runtime, meaning that isLocal might not be stable. RepointableActorRef has + * the feature that it starts out “not fully started” (but you can send to it), + * which is why `isStarted` features here; it is not improbable that cluster + * actor refs will have the same behavior. + */ +private[akka] trait RepointableRef extends ActorRefScope { + def isStarted: Boolean +} + /** * Internal trait for assembling all the functionality needed internally on * ActorRefs. NOTE THAT THIS IS NOT A STABLE EXTERNAL INTERFACE! @@ -210,6 +224,16 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe def isLocal: Boolean } +/** + * Common trait of all actor refs which actually have a Cell, most notably + * LocalActorRef and RepointableActorRef. The former specializes the return + * type of `underlying` so that follow-up calls can use invokevirtual instead + * of invokeinterface. + */ +private[akka] abstract class ActorRefWithCell extends InternalActorRef { this: ActorRefScope ⇒ + def underlying: Cell +} + /** * This is an internal look-up failure token, not useful for anything else. */ @@ -228,21 +252,21 @@ private[akka] class LocalActorRef private[akka] ( _props: Props, _supervisor: InternalActorRef, override val path: ActorPath) - extends InternalActorRef with LocalRef { + extends ActorRefWithCell with LocalRef { /* - * actorCell.start() publishes actorCell & this to the dispatcher, which - * means that messages may be processed theoretically before the constructor - * ends. The JMM guarantees visibility for final fields only after the end - * of the constructor, so publish the actorCell safely by making it a - * @volatile var which is NOT TO BE WRITTEN TO. The alternative would be to - * move start() outside of the constructor, which would basically require - * us to use purely factory methods for creating LocalActorRefs. + * Safe publication of this class’s fields is guaranteed by mailbox.setActor() + * which is called indirectly from actorCell.start() (if you’re wondering why + * this is at all important, remember that under the JMM final fields are only + * frozen at the _end_ of the constructor, but we are publishing “this” before + * that is reached). */ - @volatile - private var actorCell = newActorCell(_system, this, _props, _supervisor) + private val actorCell: ActorCell = newActorCell(_system, this, _props, _supervisor) actorCell.start() + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + _supervisor.sendSystemMessage(akka.dispatch.Supervise(this)) + protected def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef): ActorCell = new ActorCell(system, ref, props, supervisor) @@ -313,9 +337,9 @@ private[akka] class LocalActorRef private[akka] ( // ========= AKKA PROTECTED FUNCTIONS ========= - protected[akka] def underlying: ActorCell = actorCell + def underlying: ActorCell = actorCell - override def sendSystemMessage(message: SystemMessage): Unit = underlying.dispatcher.systemDispatch(underlying, message) + override def sendSystemMessage(message: SystemMessage): Unit = actorCell.sendSystemMessage(message) override def !(message: Any)(implicit sender: ActorRef = null): Unit = actorCell.tell(message, sender) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 4c200b204c..bbb84144c5 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -26,12 +26,12 @@ trait ActorRefProvider { /** * Reference to the supervisor used for all top-level user actors. */ - def guardian: InternalActorRef + def guardian: LocalActorRef /** * Reference to the supervisor used for all top-level system actors. */ - def systemGuardian: InternalActorRef + def systemGuardian: LocalActorRef /** * Dead letter destination for this provider. @@ -104,7 +104,8 @@ trait ActorRefProvider { path: ActorPath, systemService: Boolean, deploy: Option[Deploy], - lookupDeploy: Boolean): InternalActorRef + lookupDeploy: Boolean, + async: Boolean): InternalActorRef /** * Create actor reference for a specified local or remote path. If no such @@ -481,11 +482,10 @@ class LocalActorRefProvider( } } - lazy val guardian: InternalActorRef = - actorOf(system, guardianProps, rootGuardian, rootPath / "user", true, None, false) + lazy val guardian: LocalActorRef = new LocalActorRef(system, guardianProps, rootGuardian, rootPath / "user") - lazy val systemGuardian: InternalActorRef = - actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system", true, None, false) + lazy val systemGuardian: LocalActorRef = + new LocalActorRef(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system") lazy val tempContainer = new VirtualPathContainer(system.provider, tempNode, rootGuardian, log) @@ -539,22 +539,20 @@ class LocalActorRefProvider( } def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, - systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean): InternalActorRef = { + systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean, async: Boolean): InternalActorRef = { props.routerConfig match { - case NoRouter ⇒ new LocalActorRef(system, props, supervisor, path) // create a local actor + case NoRouter ⇒ + if (async) new RepointableActorRef(system, props, supervisor, path).initialize() + else new LocalActorRef(system, props, supervisor, path) case router ⇒ val lookup = if (lookupDeploy) deployer.lookup(path) else None val fromProps = Iterator(props.deploy.copy(routerConfig = props.deploy.routerConfig withFallback router)) val d = fromProps ++ deploy.iterator ++ lookup.iterator reduce ((a, b) ⇒ b withFallback a) - new RoutedActorRef(system, props.withRouter(d.routerConfig), supervisor, path) + val ref = new RoutedActorRef(system, props.withRouter(d.routerConfig), supervisor, path).initialize() + if (async) ref else ref.activate() } } def getExternalAddressFor(addr: Address): Option[Address] = if (addr == rootPath.address) Some(addr) else None } -private[akka] class GuardianCell(_system: ActorSystemImpl, _self: InternalActorRef, _props: Props, _parent: InternalActorRef) - extends ActorCell(_system, _self, _props, _parent) { - -} - diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 0d13f2451a..eb0f241177 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -422,6 +422,13 @@ abstract class ExtendedActorSystem extends ActorSystem { * creation. */ def dynamicAccess: DynamicAccess + + /** + * For debugging: traverse actor hierarchy and make string representation. + * Careful, this may OOM on large actor systems, and it is only meant for + * helping debugging in case something already went terminally wrong. + */ + private[akka] def printTree: String } private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, classLoader: ClassLoader) extends ExtendedActorSystem { @@ -479,20 +486,11 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, protected def systemImpl: ActorSystemImpl = this - private[akka] def systemActorOf(props: Props, name: String): ActorRef = { - implicit val timeout = settings.CreationTimeout - Await.result((systemGuardian ? CreateChild(props, name)).mapTo[ActorRef], timeout.duration) - } + private[akka] def systemActorOf(props: Props, name: String): ActorRef = systemGuardian.underlying.attachChild(props, name) - def actorOf(props: Props, name: String): ActorRef = { - implicit val timeout = settings.CreationTimeout - Await.result((guardian ? CreateChild(props, name)).mapTo[ActorRef], timeout.duration) - } + def actorOf(props: Props, name: String): ActorRef = guardian.underlying.attachChild(props, name) - def actorOf(props: Props): ActorRef = { - implicit val timeout = settings.CreationTimeout - Await.result((guardian ? CreateRandomNameChild(props)).mapTo[ActorRef], timeout.duration) - } + def actorOf(props: Props): ActorRef = guardian.underlying.attachChild(props) def stop(actor: ActorRef): Unit = { implicit val timeout = settings.CreationTimeout @@ -539,10 +537,10 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, def dequeue() = null def hasMessages = false def numberOfMessages = 0 - def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = () + def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = () } //FIXME Why do we need this at all? - val deadLetterMailbox: Mailbox = new Mailbox(null, deadLetterQueue) { + val deadLetterMailbox: Mailbox = new Mailbox(deadLetterQueue) { becomeClosed() def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit = deadLetters ! DeadLetter(handle, receiver, receiver) @@ -557,8 +555,8 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, def terminationFuture: Future[Unit] = provider.terminationFuture def lookupRoot: InternalActorRef = provider.rootGuardian - def guardian: InternalActorRef = provider.guardian - def systemGuardian: InternalActorRef = provider.systemGuardian + def guardian: LocalActorRef = provider.guardian + def systemGuardian: LocalActorRef = provider.systemGuardian def /(actorName: String): ActorPath = guardian.path / actorName def /(path: Iterable[String]): ActorPath = guardian.path / path @@ -682,6 +680,31 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, override def toString: String = lookupRoot.path.root.address.toString + override def printTree: String = { + def printNode(node: ActorRef, indent: String): String = { + node match { + case wc: ActorRefWithCell ⇒ + val cell = wc.underlying + indent + "-> " + node.path.name + " " + Logging.simpleName(node) + " " + + (cell match { + case real: ActorCell ⇒ if (real.actor ne null) real.actor.getClass else "null" + case _ ⇒ Logging.simpleName(cell) + }) + + " " + (cell.childrenRefs match { + case ActorCell.TerminatingChildrenContainer(_, toDie, reason) ⇒ + "Terminating(" + reason + ")" + + (toDie.toSeq.sorted mkString ("\n" + indent + " toDie: ", "\n" + indent + " ", "")) + case x ⇒ Logging.simpleName(x) + }) + + (if (cell.childrenRefs.children.isEmpty) "" else "\n") + + (cell.childrenRefs.children.toSeq.sorted map (printNode(_, indent + " |")) mkString ("\n")) + case _ ⇒ + indent + node.path.name + " " + Logging.simpleName(node) + } + } + printNode(actorFor("/"), "") + } + final class TerminationCallbacks extends Runnable with Awaitable[Unit] { private val lock = new ReentrantGuard private var callbacks: List[Runnable] = _ //non-volatile since guarded by the lock diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 27a9f346db..76eed2eca9 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -9,11 +9,22 @@ import scala.collection.JavaConversions._ import java.lang.{ Iterable ⇒ JIterable } import akka.util.Duration +/** + * INTERNAL API + */ +private[akka] sealed trait ChildStats + +/** + * INTERNAL API + */ +private[akka] case object ChildNameReserved extends ChildStats + /** * ChildRestartStats is the statistics kept by every parent Actor for every child Actor * and is used for SupervisorStrategies to know how to deal with problems that occur for the children. */ -case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) { +case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) + extends ChildStats { //FIXME How about making ChildRestartStats immutable and then move these methods into the actual supervisor strategies? def requestRestartPermission(retriesWindow: (Option[Int], Option[Int])): Boolean = diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala new file mode 100644 index 0000000000..ad9a7cb0c4 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -0,0 +1,214 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor + +import akka.util.Unsafe +import scala.annotation.tailrec +import akka.dispatch.SystemMessage +import akka.dispatch.Mailbox +import akka.dispatch.Terminate +import akka.dispatch.Envelope +import akka.dispatch.Supervise +import akka.dispatch.Create +import akka.dispatch.MessageDispatcher +import java.util.concurrent.locks.ReentrantLock +import akka.event.Logging.Warning +import scala.collection.mutable.Queue + +/** + * This actor ref starts out with some dummy cell (by default just enqueuing + * messages into vectors protected by ReentrantLock), it must be initialize()’d + * before it can be sent to, and it will be activate()’d by its supervisor in + * response to the Supervise() message, which will replace the contained Cell + * with a fully functional one, transfer all messages from dummy to real queue + * and swap out the cell ref. + */ +private[akka] class RepointableActorRef( + val system: ActorSystemImpl, + val props: Props, + val supervisor: InternalActorRef, + val path: ActorPath) + extends ActorRefWithCell with RepointableRef { + + import AbstractActorRef.cellOffset + + @volatile private var _cellDoNotCallMeDirectly: Cell = _ + + def underlying: Cell = Unsafe.instance.getObjectVolatile(this, cellOffset).asInstanceOf[Cell] + + @tailrec final def swapCell(next: Cell): Cell = { + val old = underlying + if (Unsafe.instance.compareAndSwapObject(this, cellOffset, old, next)) old else swapCell(next) + } + + /** + * Initialize: make a dummy cell which holds just a mailbox, then tell our + * supervisor that we exist so that he can create the real Cell in + * handleSupervise(). + * + * Call twice on your own peril! + * + * This is protected so that others can have different initialization. + */ + def initialize(): this.type = { + swapCell(new UnstartedCell(system, this, props, supervisor)) + supervisor.sendSystemMessage(Supervise(this)) + this + } + + /** + * This method is supposed to be called by the supervisor in handleSupervise() + * to replace the UnstartedCell with the real one. It assumes no concurrent + * modification of the `underlying` field, though it is safe to send messages + * at any time. + */ + def activate(): this.type = { + underlying match { + case u: UnstartedCell ⇒ u.replaceWith(newCell()) + case _ ⇒ // this happens routinely for things which were created async=false + } + this + } + + /** + * This is called by activate() to obtain the cell which is to replace the + * unstarted cell. The cell must be fully functional. + */ + def newCell(): Cell = new ActorCell(system, this, props, supervisor).start() + + def suspend(): Unit = underlying.suspend() + + def resume(): Unit = underlying.resume() + + def stop(): Unit = underlying.stop() + + def restart(cause: Throwable): Unit = underlying.restart(cause) + + def isStarted: Boolean = !underlying.isInstanceOf[UnstartedCell] + + def isTerminated: Boolean = underlying.isTerminated + + def provider: ActorRefProvider = system.provider + + def isLocal: Boolean = underlying.isLocal + + def getParent: InternalActorRef = underlying.parent + + def getChild(name: Iterator[String]): InternalActorRef = + if (name.hasNext) { + name.next match { + case ".." ⇒ getParent.getChild(name) + case "" ⇒ getChild(name) + case other ⇒ + underlying.childrenRefs.getByName(other) match { + case Some(crs) ⇒ crs.child.asInstanceOf[InternalActorRef].getChild(name) + case None ⇒ Nobody + } + } + } else this + + def !(message: Any)(implicit sender: ActorRef = null) = underlying.tell(message, sender) + + def sendSystemMessage(message: SystemMessage) = underlying.sendSystemMessage(message) + + @throws(classOf[java.io.ObjectStreamException]) + protected def writeReplace(): AnyRef = SerializedActorRef(path) +} + +private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: RepointableActorRef, val props: Props, val supervisor: InternalActorRef) + extends Cell { + + /* + * This lock protects all accesses to this cell’s queues. It also ensures + * safe switching to the started ActorCell. + */ + val lock = new ReentrantLock + + // use Envelope to keep on-send checks in the same place + val queue: Queue[Envelope] = Queue() + val systemQueue: Queue[SystemMessage] = Queue() + + def replaceWith(cell: Cell): Unit = { + lock.lock() + try { + /* + * The CallingThreadDispatcher nicely dives under the ReentrantLock and + * breaks things by enqueueing into stale queues from within the message + * processing which happens in-line for sendSystemMessage() and tell(). + * Since this is the only possible way to f*ck things up within this + * lock, double-tap (well, N-tap, really); concurrent modification is + * still not possible because we’re the only thread accessing the queues. + */ + var interrupted = false + while (systemQueue.nonEmpty || queue.nonEmpty) { + while (systemQueue.nonEmpty) { + val msg = systemQueue.dequeue() + try cell.sendSystemMessage(msg) + catch { + case _: InterruptedException ⇒ interrupted = true + } + } + if (queue.nonEmpty) { + val envelope = queue.dequeue() + try cell.tell(envelope.message, envelope.sender) + catch { + case _: InterruptedException ⇒ interrupted = true + } + } + } + if (interrupted) throw new InterruptedException + } finally try + self.swapCell(cell) + finally + lock.unlock() + } + + def system: ActorSystem = systemImpl + def suspend(): Unit = {} + def resume(): Unit = {} + def restart(cause: Throwable): Unit = {} + def stop(): Unit = sendSystemMessage(Terminate()) + def isTerminated: Boolean = false + def parent: InternalActorRef = supervisor + def childrenRefs: ActorCell.ChildrenContainer = ActorCell.EmptyChildrenContainer + def tell(message: Any, sender: ActorRef): Unit = { + lock.lock() + try { + if (self.underlying eq this) queue enqueue Envelope(message, sender, system) + else self.underlying.tell(message, sender) + } finally { + lock.unlock() + } + } + def sendSystemMessage(msg: SystemMessage): Unit = { + lock.lock() + try { + if (self.underlying eq this) systemQueue enqueue msg + else self.underlying.sendSystemMessage(msg) + } finally { + lock.unlock() + } + } + def isLocal = true + def hasMessages: Boolean = { + lock.lock() + try { + if (self.underlying eq this) !queue.isEmpty + else self.underlying.hasMessages + } finally { + lock.unlock() + } + } + def numberOfMessages: Int = { + lock.lock() + try { + if (self.underlying eq this) queue.size + else self.underlying.numberOfMessages + } finally { + lock.unlock() + } + } + +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 9bb560417b..1933015e88 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -592,7 +592,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] ( /** * Returns the akka.actor.Props representation of this TypedProps */ - def actorProps(): Props = if (dispatcher == Props().dispatcher) Props() else Props(dispatcher = dispatcher) + def actorProps(): Props = if (dispatcher == Props.default.dispatcher) Props.default else Props(dispatcher = dispatcher) } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 12eea14ffc..546373c33f 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -16,8 +16,10 @@ import akka.event.Logging.LogEventException import akka.jsr166y.{ ForkJoinTask, ForkJoinPool } import akka.util.{ Unsafe, Duration, NonFatal, Index } -final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) { - if (message.isInstanceOf[AnyRef]) { +final case class Envelope private (val message: Any, val sender: ActorRef) + +object Envelope { + def apply(message: Any, sender: ActorRef, system: ActorSystem): Envelope = { val msg = message.asInstanceOf[AnyRef] if (msg eq null) throw new InvalidMessageException("Message is null") if (system.settings.SerializeAllMessages && !msg.isInstanceOf[NoSerializationVerificationNeeded]) { @@ -30,6 +32,7 @@ final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorS } } } + new Envelope(message, sender) } } @@ -228,8 +231,8 @@ private[akka] object MessageDispatcher { } { val status = if (a.isTerminated) " (terminated)" else " (alive)" val messages = a match { - case l: LocalActorRef ⇒ " " + l.underlying.mailbox.numberOfMessages + " messages" - case _ ⇒ " " + a.getClass + case r: ActorRefWithCell ⇒ " " + r.underlying.numberOfMessages + " messages" + case _ ⇒ " " + a.getClass } val parent = a match { case i: InternalActorRef ⇒ ", parent: " + i.getParent @@ -265,7 +268,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext /** * Creates and returns a mailbox for the given actor. */ - protected[akka] def createMailbox(actor: ActorCell): Mailbox //FIXME should this really be private[akka]? + protected[akka] def createMailbox(actor: Cell): Mailbox //FIXME should this really be private[akka]? /** * Identifier of this dispatcher, corresponds to the full key diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 6beee3c9da..5b8c5209b0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -9,6 +9,7 @@ import annotation.tailrec import akka.util.{ Duration, Helpers } import java.util.{ Comparator, Iterator } import java.util.concurrent.{ Executor, LinkedBlockingQueue, ConcurrentLinkedQueue, ConcurrentSkipListSet } +import akka.actor.ActorSystemImpl /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -46,24 +47,25 @@ class BalancingDispatcher( /** * INTERNAL USE ONLY */ - private[akka] val messageQueue: MessageQueue = mailboxType.create(None) + private[akka] val messageQueue: MessageQueue = mailboxType.create(None, None) - private class SharingMailbox(_actor: ActorCell, _messageQueue: MessageQueue) extends Mailbox(_actor, _messageQueue) with DefaultSystemMessageQueue { + private class SharingMailbox(val system: ActorSystemImpl, _messageQueue: MessageQueue) + extends Mailbox(_messageQueue) with DefaultSystemMessageQueue { override def cleanUp(): Unit = { - val dlq = actor.systemImpl.deadLetterMailbox + val dlq = system.deadLetterMailbox //Don't call the original implementation of this since it scraps all messages, and we don't want to do that var message = systemDrain(NoMessage) while (message ne null) { // message must be “virgin” before being able to systemEnqueue again val next = message.next message.next = null - dlq.systemEnqueue(actor.self, message) + dlq.systemEnqueue(system.deadLetters, message) message = next } } } - protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor, messageQueue) + protected[akka] override def createMailbox(actor: akka.actor.Cell): Mailbox = new SharingMailbox(actor.systemImpl, messageQueue) protected[akka] override def register(actor: ActorCell): Unit = { super.register(actor) diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 3c17ab8db4..d382cc5ecc 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -82,7 +82,8 @@ class Dispatcher( /** * INTERNAL USE ONLY */ - protected[akka] def createMailbox(actor: ActorCell): Mailbox = new Mailbox(actor, mailboxType.create(Some(actor))) with DefaultSystemMessageQueue + protected[akka] def createMailbox(actor: akka.actor.Cell): Mailbox = + new Mailbox(mailboxType.create(Some(actor.self), Some(actor.system))) with DefaultSystemMessageQueue /** * INTERNAL USE ONLY diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 25fc0250af..36b386cef1 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -6,6 +6,7 @@ package akka.dispatch import akka.AkkaException import java.util.{ Comparator, PriorityQueue, Queue, Deque } import akka.util._ +import akka.actor.{ ActorCell, ActorRef, Cell } import java.util.concurrent._ import annotation.tailrec import akka.event.Logging.Error @@ -41,11 +42,32 @@ private[akka] object Mailbox { * * INTERNAL API */ -private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: MessageQueue) +private[akka] abstract class Mailbox(val messageQueue: MessageQueue) extends SystemMessageQueue with Runnable { import Mailbox._ + /* + * This is needed for actually executing the mailbox, i.e. invoking the + * ActorCell. There are situations (e.g. RepointableActorRef) where a Mailbox + * is constructed but we know that we will not execute it, in which case this + * will be null. It must be a var to support switching into an “active” + * mailbox, should the owning ActorRef turn local. + * + * ANOTHER THING, IMPORTANT: + * + * actorCell.start() publishes actorCell & self to the dispatcher, which + * means that messages may be processed theoretically before self’s constructor + * ends. The JMM guarantees visibility for final fields only after the end + * of the constructor, so safe publication requires that THIS WRITE BELOW + * stay as it is. + */ + @volatile + var actor: ActorCell = _ + def setActor(cell: ActorCell): Unit = actor = cell + + def dispatcher: MessageDispatcher = actor.dispatcher + /** * Try to enqueue the message to this queue, or throw an exception. */ @@ -230,11 +252,12 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes * if we closed the mailbox, we must dump the remaining system messages * to deadLetters (this is essential for DeathWatch) */ + val dlm = actor.systemImpl.deadLetterMailbox while (nextMessage ne null) { val msg = nextMessage nextMessage = nextMessage.next msg.next = null - try actor.systemImpl.deadLetterMailbox.systemEnqueue(actor.self, msg) + try dlm.systemEnqueue(actor.self, msg) catch { case NonFatal(e) ⇒ actor.system.eventStream.publish( Error(e, actor.self.path.toString, this.getClass, "error while enqueuing " + msg + " to deadLetters: " + e.getMessage)) @@ -244,9 +267,6 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes if (failure ne null) actor.handleInvokeFailure(failure, failure.getMessage) } - @inline - final def dispatcher: MessageDispatcher = actor.dispatcher - /** * Overridable callback to clean up the mailbox, * called when an actor is unregistered. @@ -265,7 +285,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes } if (messageQueue ne null) // needed for CallingThreadDispatcher, which never calls Mailbox.run() - messageQueue.cleanUp(actor, actor.systemImpl.deadLetterQueue) + messageQueue.cleanUp(actor.self, actor.systemImpl.deadLetterQueue) } } @@ -303,7 +323,7 @@ trait MessageQueue { * which is passed in. The owner of this MessageQueue is passed in if * available (e.g. for creating DeadLetters()), “/deadletters” otherwise. */ - def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit + def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit } /** @@ -331,10 +351,11 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒ @tailrec final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = { assert(message.next eq null) - if (Mailbox.debug) println(actor.self + " having enqueued " + message) + if (Mailbox.debug) println(receiver + " having enqueued " + message) val head = systemQueueGet - if (head == NoMessage) actor.system.deadLetterMailbox.systemEnqueue(receiver, message) - else { + if (head == NoMessage) { + if (actor ne null) actor.systemImpl.deadLetterMailbox.systemEnqueue(receiver, message) + } else { /* * this write is safely published by the compareAndSet contained within * systemQueuePut; “Intra-Thread Semantics” on page 12 of the JSR133 spec @@ -366,11 +387,11 @@ trait QueueBasedMessageQueue extends MessageQueue { def queue: Queue[Envelope] def numberOfMessages = queue.size def hasMessages = !queue.isEmpty - def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = { + def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = { if (hasMessages) { var envelope = dequeue while (envelope ne null) { - deadLetters.enqueue(owner.self, envelope) + deadLetters.enqueue(owner, envelope) envelope = dequeue } } @@ -445,10 +466,20 @@ trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue { } /** - * MailboxType is a factory to create MessageQueues for an optionally provided ActorContext + * MailboxType is a factory to create MessageQueues for an optionally + * provided ActorContext. + * + * Possibly Important Notice + * + * When implementing a custom mailbox type, be aware that there is special + * semantics attached to `system.actorOf()` in that sending to the returned + * ActorRef may—for a short period of time—enqueue the messages first in a + * dummy queue. Top-level actors are created in two steps, and only after the + * guardian actor has performed that second step will all previously sent + * messages be transferred from the dummy queue into the real mailbox. */ trait MailboxType { - def create(owner: Option[ActorContext]): MessageQueue + def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue } /** @@ -458,7 +489,7 @@ case class UnboundedMailbox() extends MailboxType { def this(settings: ActorSystem.Settings, config: Config) = this() - final override def create(owner: Option[ActorContext]): MessageQueue = + final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new ConcurrentLinkedQueue[Envelope]() with QueueBasedMessageQueue with UnboundedMessageQueueSemantics { final def queue: Queue[Envelope] = this } @@ -475,7 +506,7 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - final override def create(owner: Option[ActorContext]): MessageQueue = + final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new LinkedBlockingQueue[Envelope](capacity) with QueueBasedMessageQueue with BoundedMessageQueueSemantics { final def queue: BlockingQueue[Envelope] = this final val pushTimeOut = BoundedMailbox.this.pushTimeOut @@ -488,7 +519,7 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat */ class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope], final val initialCapacity: Int) extends MailboxType { def this(cmp: Comparator[Envelope]) = this(cmp, 11) - final override def create(owner: Option[ActorContext]): MessageQueue = + final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new PriorityBlockingQueue[Envelope](initialCapacity, cmp) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics { final def queue: Queue[Envelope] = this } @@ -503,7 +534,7 @@ class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val cap if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - final override def create(owner: Option[ActorContext]): MessageQueue = + final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) with QueueBasedMessageQueue with BoundedMessageQueueSemantics { final def queue: BlockingQueue[Envelope] = this final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut @@ -517,7 +548,7 @@ case class UnboundedDequeBasedMailbox() extends MailboxType { def this(settings: ActorSystem.Settings, config: Config) = this() - final override def create(owner: Option[ActorContext]): MessageQueue = + final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new LinkedBlockingDeque[Envelope]() with DequeBasedMessageQueue with UnboundedDequeBasedMessageQueueSemantics { final val queue = this } @@ -534,7 +565,7 @@ case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTime if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedDequeBasedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedDequeBasedMailbox can not be null") - final override def create(owner: Option[ActorContext]): MessageQueue = + final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new LinkedBlockingDeque[Envelope](capacity) with DequeBasedMessageQueue with BoundedDequeBasedMessageQueueSemantics { final val queue = this final val pushTimeOut = BoundedDequeBasedMailbox.this.pushTimeOut diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index bcd92794da..cb0f5ee09b 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -23,42 +23,28 @@ import scala.runtime.ScalaRunTime * send a message to on (or more) of these actors. */ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _supervisor: InternalActorRef, _path: ActorPath) - extends LocalActorRef( - _system, - _props.copy(creator = () ⇒ _props.routerConfig.createActor(), dispatcher = _props.routerConfig.routerDispatcher), - _supervisor, - _path) { + extends RepointableActorRef(_system, _props, _supervisor, _path) { - /* - * CAUTION: RoutedActorRef is PROBLEMATIC - * ====================================== - * - * We are constructing/assembling the children outside of the scope of the - * Router actor, inserting them in its childrenRef list, which is not at all - * synchronized. This is done exactly once at start-up, all other accesses - * are done from the Router actor. This means that the only thing which is - * really hairy is making sure that the Router does not touch its childrenRefs - * before we are done with them: lock the monitor of the actor cell (hence the - * override of newActorCell) and use that to block the Router constructor for - * as long as it takes to setup the RoutedActorRef itself. - * - * ===> I M P O R T A N T N O T I C E <=== - * - * DO NOT THROW ANY EXCEPTIONS BEFORE THE FOLLOWING TRY-BLOCK WITHOUT - * EXITING THE MONITOR OF THE actorCell! - * - * This is important, just don’t do it! No kidding. - */ - override def newActorCell( - system: ActorSystemImpl, - ref: InternalActorRef, - props: Props, - supervisor: InternalActorRef): ActorCell = { - val cell = super.newActorCell(system, ref, props, supervisor) - Unsafe.instance.monitorEnter(cell) - cell + // verify that a BalancingDispatcher is not used with a Router + if (_props.routerConfig != NoRouter && _system.dispatchers.isBalancingDispatcher(_props.routerConfig.routerDispatcher)) { + throw new ConfigurationException( + "Configuration for " + this + + " is invalid - you can not use a 'BalancingDispatcher' as a Router's dispatcher, you can however use it for the routees.") } + _props.routerConfig.verifyConfig() + + override def newCell(): Cell = new RoutedActorCell(system, this, props, supervisor) + +} + +private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActorRef, _props: Props, _supervisor: InternalActorRef) + extends ActorCell( + _system, + _ref, + _props.copy(creator = () ⇒ _props.routerConfig.createActor(), dispatcher = _props.routerConfig.routerDispatcher), + _supervisor) { + private[akka] val routerConfig = _props.routerConfig private[akka] val routeeProps = _props.copy(routerConfig = NoRouter) private[akka] val resizeInProgress = new AtomicBoolean @@ -72,39 +58,28 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup private var _routeeProvider: RouteeProvider = _ def routeeProvider = _routeeProvider - val route = - try { - // verify that a BalancingDispatcher is not used with a Router - if (_props.routerConfig != NoRouter && _system.dispatchers.isBalancingDispatcher(_props.routerConfig.routerDispatcher)) { - actorContext.stop(actorContext.self) - throw new ConfigurationException( - "Configuration for actor [" + _path.toString + - "] is invalid - you can not use a 'BalancingDispatcher' as a Router's dispatcher, you can however use it for the routees.") - } - - _routeeProvider = routerConfig.createRouteeProvider(actorContext) - val r = routerConfig.createRoute(routeeProps, routeeProvider) - // initial resize, before message send - routerConfig.resizer foreach { r ⇒ - if (r.isTimeForResize(resizeCounter.getAndIncrement())) - r.resize(routeeProps, routeeProvider) - } - r - } finally { - assert(Thread.holdsLock(actorContext)) - Unsafe.instance.monitorExit(actorContext) // unblock Router’s constructor + val route = { + _routeeProvider = routerConfig.createRouteeProvider(this) + val r = routerConfig.createRoute(routeeProps, routeeProvider) + // initial resize, before message send + routerConfig.resizer foreach { r ⇒ + if (r.isTimeForResize(resizeCounter.getAndIncrement())) + r.resize(routeeProps, routeeProvider) } + r + } if (routerConfig.resizer.isEmpty && _routees.isEmpty) throw new ActorInitializationException("router " + routerConfig + " did not register routees!") + start() + /* * end of construction */ def applyRoute(sender: ActorRef, message: Any): Iterable[Destination] = message match { - case _: AutoReceivedMessage ⇒ Destination(this, this) :: Nil - case Terminated(_) ⇒ Destination(this, this) :: Nil + case _: AutoReceivedMessage ⇒ Destination(self, self) :: Nil case CurrentRoutees ⇒ sender ! RouterRoutees(_routees) Nil @@ -122,7 +97,7 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup private[akka] def addRoutees(newRoutees: IndexedSeq[ActorRef]): Unit = { _routees = _routees ++ newRoutees // subscribe to Terminated messages for all route destinations, to be handled by Router actor - newRoutees foreach underlying.watch + newRoutees foreach watch } /** @@ -133,13 +108,13 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup */ private[akka] def removeRoutees(abandonedRoutees: IndexedSeq[ActorRef]): Unit = { _routees = _routees diff abandonedRoutees - abandonedRoutees foreach underlying.unwatch + abandonedRoutees foreach unwatch } - override def !(message: Any)(implicit sender: ActorRef = null): Unit = { + override def tell(message: Any, sender: ActorRef): Unit = { resize() - val s = if (sender eq null) underlying.system.deadLetters else sender + val s = if (sender eq null) system.deadLetters else sender val msg = message match { case Broadcast(m) ⇒ m @@ -147,15 +122,18 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup } applyRoute(s, message) match { - case Destination(_, x) :: Nil if x eq this ⇒ super.!(message)(s) - case refs ⇒ refs foreach (p ⇒ p.recipient.!(msg)(p.sender)) + case Destination(_, x) :: Nil if x == self ⇒ super.tell(message, s) + case refs ⇒ + refs foreach (p ⇒ + if (p.recipient == self) super.tell(msg, p.sender) + else p.recipient.!(msg)(p.sender)) } } def resize(): Unit = { for (r ← routerConfig.resizer) { if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeInProgress.compareAndSet(false, true)) - super.!(Router.Resize) + super.tell(Router.Resize, self) } } } @@ -212,6 +190,11 @@ trait RouterConfig { */ def resizer: Option[Resizer] = None + /** + * Check that everything is there which is needed. Called in constructor of RoutedActorRef to fail early. + */ + def verifyConfig(): Unit = {} + } /** @@ -227,7 +210,7 @@ class RouteeProvider(val context: ActorContext, val resizer: Option[Resizer]) { * Not thread safe, but intended to be called from protected points, such as * `RouterConfig.createRoute` and `Resizer.resize`. */ - def registerRoutees(routees: IndexedSeq[ActorRef]): Unit = routedRef.addRoutees(routees) + def registerRoutees(routees: IndexedSeq[ActorRef]): Unit = routedCell.addRoutees(routees) /** * Adds the routees to the router. @@ -247,7 +230,7 @@ class RouteeProvider(val context: ActorContext, val resizer: Option[Resizer]) { * Not thread safe, but intended to be called from protected points, such as * `Resizer.resize`. */ - def unregisterRoutees(routees: IndexedSeq[ActorRef]): Unit = routedRef.removeRoutees(routees) + def unregisterRoutees(routees: IndexedSeq[ActorRef]): Unit = routedCell.removeRoutees(routees) def createRoutees(props: Props, nrOfInstances: Int, routees: Iterable[String]): IndexedSeq[ActorRef] = (nrOfInstances, routees) match { @@ -264,9 +247,9 @@ class RouteeProvider(val context: ActorContext, val resizer: Option[Resizer]) { /** * All routees of the router */ - def routees: IndexedSeq[ActorRef] = routedRef.routees + def routees: IndexedSeq[ActorRef] = routedCell.routees - private def routedRef = context.self.asInstanceOf[RoutedActorRef] + private def routedCell = context.asInstanceOf[RoutedActorCell] } /** @@ -298,12 +281,9 @@ trait CustomRoute { */ trait Router extends Actor { - // make sure that we synchronize properly to get the childrenRefs into our CPU cache - val ref = context.synchronized { - self match { - case x: RoutedActorRef ⇒ x - case _ ⇒ throw new ActorInitializationException("Router actor can only be used in RoutedActorRef") - } + val ref = context match { + case x: RoutedActorCell ⇒ x + case _ ⇒ throw new ActorInitializationException("Router actor can only be used in RoutedActorRef, not in " + context.getClass) } final def receive = ({ @@ -417,8 +397,10 @@ class FromConfig(val routerDispatcher: String = Dispatchers.DefaultDispatcherId) def this() = this(Dispatchers.DefaultDispatcherId) - def createRoute(props: Props, routeeProvider: RouteeProvider): Route = - throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)") + override def verifyConfig(): Unit = + throw new ConfigurationException("router needs external configuration from file (e.g. application.conf)") + + def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null def supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy } @@ -774,9 +756,11 @@ trait SmallestMailboxLike { this: RouterConfig ⇒ * routers based on mailbox and actor internal state. */ protected def isProcessingMessage(a: ActorRef): Boolean = a match { - case x: LocalActorRef ⇒ - val cell = x.underlying - cell.mailbox.isScheduled && cell.currentMessage != null + case x: ActorRefWithCell ⇒ + x.underlying match { + case cell: ActorCell ⇒ cell.mailbox.isScheduled && cell.currentMessage != null + case _ ⇒ false + } case _ ⇒ false } @@ -788,8 +772,8 @@ trait SmallestMailboxLike { this: RouterConfig ⇒ * routers based on mailbox and actor internal state. */ protected def hasMessages(a: ActorRef): Boolean = a match { - case x: LocalActorRef ⇒ x.underlying.mailbox.hasMessages - case _ ⇒ false + case x: ActorRefWithCell ⇒ x.underlying.hasMessages + case _ ⇒ false } /** @@ -799,8 +783,12 @@ trait SmallestMailboxLike { this: RouterConfig ⇒ * routers based on mailbox and actor internal state. */ protected def isSuspended(a: ActorRef): Boolean = a match { - case x: LocalActorRef ⇒ x.underlying.mailbox.isSuspended - case _ ⇒ false + case x: ActorRefWithCell ⇒ + x.underlying match { + case cell: ActorCell ⇒ cell.mailbox.isSuspended + case _ ⇒ true + } + case _ ⇒ false } /** @@ -810,8 +798,8 @@ trait SmallestMailboxLike { this: RouterConfig ⇒ * routers based on mailbox and actor internal state. */ protected def numberOfMessages(a: ActorRef): Int = a match { - case x: LocalActorRef ⇒ x.underlying.mailbox.numberOfMessages - case _ ⇒ 0 + case x: ActorRefWithCell ⇒ x.underlying.numberOfMessages + case _ ⇒ 0 } def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { @@ -1283,12 +1271,20 @@ case class DefaultResizer( */ def pressure(routees: IndexedSeq[ActorRef]): Int = { routees count { - case a: LocalActorRef ⇒ - val cell = a.underlying - pressureThreshold match { - case 1 ⇒ cell.mailbox.isScheduled && cell.mailbox.hasMessages - case i if i < 1 ⇒ cell.mailbox.isScheduled && cell.currentMessage != null - case threshold ⇒ cell.mailbox.numberOfMessages >= threshold + case a: ActorRefWithCell ⇒ + a.underlying match { + case cell: ActorCell ⇒ + pressureThreshold match { + case 1 ⇒ cell.mailbox.isScheduled && cell.mailbox.hasMessages + case i if i < 1 ⇒ cell.mailbox.isScheduled && cell.currentMessage != null + case threshold ⇒ cell.mailbox.numberOfMessages >= threshold + } + case cell ⇒ + pressureThreshold match { + case 1 ⇒ cell.hasMessages + case i if i < 1 ⇒ true // unstarted cells are always busy, for example + case threshold ⇒ cell.numberOfMessages >= threshold + } } case x ⇒ false diff --git a/akka-agent/src/main/scala/akka/agent/Agent.scala b/akka-agent/src/main/scala/akka/agent/Agent.scala index 64834178a8..ea3d8719cd 100644 --- a/akka-agent/src/main/scala/akka/agent/Agent.scala +++ b/akka-agent/src/main/scala/akka/agent/Agent.scala @@ -97,7 +97,7 @@ object Agent { */ class Agent[T](initialValue: T, system: ActorSystem) { private val ref = Ref(initialValue) - private val updater = system.actorOf(Props(new AgentUpdater(this, ref))).asInstanceOf[LocalActorRef] //TODO can we avoid this somehow? + private val updater = system.actorOf(Props(new AgentUpdater(this, ref))).asInstanceOf[InternalActorRef] //TODO can we avoid this somehow? /** * Read the internal state of the agent. diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 60b934a864..4da4dd6620 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -16,7 +16,7 @@ akka { seed-nodes = [] # how long to wait for one of the seed nodes to reply to initial join request - join-seed-node-timeout = 5s + seed-node-timeout = 5s # automatic join the seed-nodes at startup auto-join = on @@ -45,6 +45,10 @@ akka { # how often should the node move nodes, marked as unreachable by the failure detector, out of the membership ring? unreachable-nodes-reaper-interval = 1s + # A joining node stops sending heartbeats to the node to join if it hasn't become member + # of the cluster within this deadline. + join-timeout = 60s + failure-detector { # defines the failure detector threshold diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index 6962fc10d6..c397d065e5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -84,11 +84,7 @@ class AccrualFailureDetector( settings.FailureDetectorMaxSampleSize, settings.FailureDetectorAcceptableHeartbeatPause, settings.FailureDetectorMinStdDeviation, - // we use a conservative estimate for the first heartbeat because - // gossip needs to spread back to the joining node before the - // first real heartbeat is sent. Initial heartbeat is added when joining. - // FIXME this can be changed to HeartbeatInterval when ticket #2249 is fixed - settings.GossipInterval * 3 + settings.HeartbeatInterval, + settings.HeartbeatInterval, AccrualFailureDetector.realClock) private val log = Logging(system, "FailureDetector") diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 0cf79d7102..1561ae7059 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -212,8 +212,8 @@ object MemberStatus { * Represents the overview of the cluster, holds the cluster convergence table and set with unreachable nodes. */ case class GossipOverview( - seen: Map[Address, VectorClock] = Map.empty[Address, VectorClock], - unreachable: Set[Member] = Set.empty[Member]) { + seen: Map[Address, VectorClock] = Map.empty, + unreachable: Set[Member] = Set.empty) { override def toString = "GossipOverview(seen = [" + seen.mkString(", ") + @@ -259,7 +259,7 @@ object Gossip { case class Gossip( overview: GossipOverview = GossipOverview(), members: SortedSet[Member], // sorted set of members with their status, sorted by address - meta: Map[String, Array[Byte]] = Map.empty[String, Array[Byte]], + meta: Map[String, Array[Byte]] = Map.empty, version: VectorClock = VectorClock()) // vector clock version extends ClusterMessage // is a serializable cluster message with Versioned[Gossip] { @@ -375,7 +375,7 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto val seedRoutees = for (address ← cluster.seedNodes; if address != cluster.selfAddress) yield self.path.toStringWithAddress(address) if (seedRoutees.nonEmpty) { - implicit val within = Timeout(cluster.clusterSettings.JoinSeedNodeTimeout) + implicit val within = Timeout(cluster.clusterSettings.SeedNodeTimeout) val seedRouter = context.actorOf( Props.empty.withRouter(ScatterGatherFirstCompletedRouter( routees = seedRoutees, within = within.duration))) @@ -495,7 +495,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) */ private case class State( latestGossip: Gossip, - memberMembershipChangeListeners: Set[MembershipChangeListener] = Set.empty[MembershipChangeListener]) + joinInProgress: Map[Address, Deadline] = Map.empty, + memberMembershipChangeListeners: Set[MembershipChangeListener] = Set.empty) if (!system.provider.isInstanceOf[RemoteActorRefProvider]) throw new ConfigurationException("ActorSystem[" + system + "] needs to have a 'RemoteActorRefProvider' enabled in the configuration") @@ -710,11 +711,18 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) * Try to join this cluster node with the node specified by 'address'. * A 'Join(thisNodeAddress)' command is sent to the node to join. */ - def join(address: Address): Unit = { - val connection = clusterCommandConnectionFor(address) - val command = ClusterUserAction.Join(selfAddress) - log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", selfAddress, address, connection) - connection ! command + @tailrec + final def join(address: Address): Unit = { + val localState = state.get + val newState = localState copy (joinInProgress = localState.joinInProgress + + (address -> (Deadline.now + JoinTimeout))) + if (!state.compareAndSet(localState, newState)) join(address) // recur + else { + val connection = clusterCommandConnectionFor(address) + val command = ClusterUserAction.Join(selfAddress) + log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", selfAddress, address, connection) + connection ! command + } } /** @@ -929,7 +937,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val localGossip = localState.latestGossip val winningGossip = - if (remoteGossip.version <> localGossip.version) { + if (isSingletonCluster(localState) && localGossip.overview.unreachable.isEmpty && remoteGossip.members.contains(self)) { + // a fresh singleton cluster that is joining, no need to merge, use received gossip + remoteGossip + + } else if (remoteGossip.version <> localGossip.version) { // concurrent val mergedGossip = remoteGossip merge localGossip val versionedMergedGossip = mergedGossip :+ vclockNode @@ -949,7 +961,15 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) remoteGossip } - val newState = localState copy (latestGossip = winningGossip seen selfAddress) + val newJoinInProgress = + if (localState.joinInProgress.isEmpty) localState.joinInProgress + else localState.joinInProgress -- + winningGossip.members.map(_.address) -- + winningGossip.overview.unreachable.map(_.address) + + val newState = localState copy ( + latestGossip = winningGossip seen selfAddress, + joinInProgress = newJoinInProgress) // if we won the race then update else try again if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update @@ -1061,16 +1081,15 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) * INTERNAL API. */ private[cluster] def heartbeat(): Unit = { + removeOverdueJoinInProgress() val localState = state.get - if (!isSingletonCluster(localState)) { - val liveMembers = localState.latestGossip.members.toIndexedSeq + val beatTo = localState.latestGossip.members.toSeq.map(_.address) ++ localState.joinInProgress.keys - for (member ← liveMembers; if member.address != selfAddress) { - val connection = clusterGossipConnectionFor(member.address) - log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, connection) - connection ! selfHeartbeat - } + for (address ← beatTo; if address != selfAddress) { + val connection = clusterGossipConnectionFor(address) + log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, connection) + connection ! selfHeartbeat } } @@ -1118,6 +1137,23 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } } + /** + * INTERNAL API. + * + * Removes overdue joinInProgress from State. + */ + @tailrec + final private[cluster] def removeOverdueJoinInProgress(): Unit = { + val localState = state.get + val overdueJoins = localState.joinInProgress collect { + case (address, deadline) if deadline.isOverdue ⇒ address + } + if (overdueJoins.nonEmpty) { + val newState = localState copy (joinInProgress = localState.joinInProgress -- overdueJoins) + if (!state.compareAndSet(localState, newState)) removeOverdueJoinInProgress() // recur + } + } + /** * INTERNAL API. * diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index c026b8c1a0..08a9b5160d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -25,15 +25,17 @@ class ClusterSettings(val config: Config, val systemName: String) { final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map { case AddressFromURIString(addr) ⇒ addr }.toIndexedSeq - final val JoinSeedNodeTimeout = Duration(getMilliseconds("akka.cluster.join-seed-node-timeout"), MILLISECONDS) - final val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) - final val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) - final val HeartbeatInterval = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS) - final val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS) - final val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) - final val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons") - final val AutoJoin = getBoolean("akka.cluster.auto-join") - final val AutoDown = getBoolean("akka.cluster.auto-down") - final val SchedulerTickDuration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS) - final val SchedulerTicksPerWheel = getInt("akka.cluster.scheduler.ticks-per-wheel") + final val SeedNodeTimeout: Duration = Duration(getMilliseconds("akka.cluster.seed-node-timeout"), MILLISECONDS) + final val PeriodicTasksInitialDelay: Duration = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) + final val GossipInterval: Duration = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) + final val HeartbeatInterval: Duration = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS) + final val LeaderActionsInterval: Duration = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS) + final val UnreachableNodesReaperInterval: Duration = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) + final val NrOfGossipDaemons: Int = getInt("akka.cluster.nr-of-gossip-daemons") + final val NrOfDeputyNodes: Int = getInt("akka.cluster.nr-of-deputy-nodes") + final val AutoJoin: Boolean = getBoolean("akka.cluster.auto-join") + final val AutoDown: Boolean = getBoolean("akka.cluster.auto-down") + final val JoinTimeout: Duration = Duration(getMilliseconds("akka.cluster.join-timeout"), MILLISECONDS) + final val SchedulerTickDuration: Duration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS) + final val SchedulerTicksPerWheel: Int = getInt("akka.cluster.scheduler.ticks-per-wheel") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinInProgressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinInProgressSpec.scala new file mode 100644 index 0000000000..256b7d563d --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinInProgressSpec.scala @@ -0,0 +1,65 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ +import akka.util.Deadline + +object JoinInProgressMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig( + debugConfig(on = false) + .withFallback(ConfigFactory.parseString(""" + akka.cluster { + # simulate delay in gossip by turning it off + gossip-interval = 300 s + failure-detector { + threshold = 4 + acceptable-heartbeat-pause = 1 second + } + }""") // increase the leader action task interval + .withFallback(MultiNodeClusterSpec.clusterConfig))) +} + +class JoinInProgressMultiJvmNode1 extends JoinInProgressSpec with AccrualFailureDetectorStrategy +class JoinInProgressMultiJvmNode2 extends JoinInProgressSpec with AccrualFailureDetectorStrategy + +abstract class JoinInProgressSpec + extends MultiNodeSpec(JoinInProgressMultiJvmSpec) + with MultiNodeClusterSpec { + + import JoinInProgressMultiJvmSpec._ + + "A cluster node" must { + "send heartbeats immediately when joining to avoid false failure detection due to delayed gossip" taggedAs LongRunningTest in { + + runOn(first) { + startClusterNode() + } + + enterBarrier("first-started") + + runOn(second) { + cluster.join(first) + } + + runOn(first) { + val until = Deadline.now + 5.seconds + while (!until.isOverdue) { + 200.millis.sleep + cluster.failureDetector.isAvailable(second) must be(true) + } + } + + enterBarrier("after") + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index 086c2fb00a..d661f0cc51 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -19,6 +19,8 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { val fourth = role("fourth") val fifth = role("fifth") + // Note that this test uses default configuration, + // not MultiNodeClusterSpec.clusterConfig commonConfig(ConfigFactory.parseString(""" akka.cluster { # FIXME remove this (use default) when ticket #2239 has been fixed diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index 60594d145e..0376545b41 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -68,7 +68,7 @@ abstract class TransitionSpec } def awaitMemberStatus(address: Address, status: MemberStatus): Unit = awaitCond { - memberStatus(address) == Up + memberStatus(address) == status } // DSL sugar for `role1 gossipTo role2` @@ -118,33 +118,18 @@ abstract class TransitionSpec awaitMembers(first, second) memberStatus(first) must be(Up) memberStatus(second) must be(Joining) + seenLatestGossip must be(Set(first)) cluster.convergence.isDefined must be(false) } enterBarrier("second-joined") first gossipTo second - runOn(second) { - members must be(Set(first, second)) - memberStatus(first) must be(Up) - memberStatus(second) must be(Joining) - // we got a conflicting version in second, and therefore not convergence in second - seenLatestGossip must be(Set(second)) - cluster.convergence.isDefined must be(false) - } - second gossipTo first - runOn(first) { - seenLatestGossip must be(Set(first, second)) - } - - first gossipTo second - runOn(second) { - seenLatestGossip must be(Set(first, second)) - } runOn(first, second) { memberStatus(first) must be(Up) memberStatus(second) must be(Joining) + seenLatestGossip must be(Set(first, second)) cluster.convergence.isDefined must be(true) } enterBarrier("convergence-joining-2") @@ -191,42 +176,20 @@ abstract class TransitionSpec second gossipTo first runOn(first) { members must be(Set(first, second, third)) - cluster.convergence.isDefined must be(false) memberStatus(third) must be(Joining) + seenLatestGossip must be(Set(first, second)) + cluster.convergence.isDefined must be(false) } first gossipTo third - runOn(third) { - members must be(Set(first, second, third)) - cluster.convergence.isDefined must be(false) - memberStatus(third) must be(Joining) - // conflicting version - seenLatestGossip must be(Set(third)) - } - third gossipTo first third gossipTo second - runOn(first, second) { - seenLatestGossip must be(Set(myself, third)) - } - - first gossipTo second - runOn(second) { - seenLatestGossip must be(Set(first, second, third)) - cluster.convergence.isDefined must be(true) - } - - runOn(first, third) { - cluster.convergence.isDefined must be(false) - } - - second gossipTo first - second gossipTo third runOn(first, second, third) { - seenLatestGossip must be(Set(first, second, third)) + members must be(Set(first, second, third)) memberStatus(first) must be(Up) memberStatus(second) must be(Up) memberStatus(third) must be(Joining) + seenLatestGossip must be(Set(first, second, third)) cluster.convergence.isDefined must be(true) } @@ -283,19 +246,21 @@ abstract class TransitionSpec "startup a second separated cluster consisting of nodes fourth and fifth" taggedAs LongRunningTest in { runOn(fourth) { cluster.join(fifth) - awaitMembers(fourth, fifth) - cluster.gossipTo(fifth) - awaitSeen(fourth, fifth) - cluster.convergence.isDefined must be(true) } runOn(fifth) { awaitMembers(fourth, fifth) - cluster.gossipTo(fourth) - awaitSeen(fourth, fifth) - cluster.gossipTo(fourth) + } + testConductor.enter("fourth-joined") + + fifth gossipTo fourth + fourth gossipTo fifth + + runOn(fourth, fifth) { + memberStatus(fourth) must be(Joining) + memberStatus(fifth) must be(Up) + seenLatestGossip must be(Set(fourth, fifth)) cluster.convergence.isDefined must be(true) } - enterBarrier("fourth-joined-fifth") enterBarrier("after-4") } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index d5a9752e5e..92e219a540 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -22,12 +22,13 @@ class ClusterConfigSpec extends AkkaSpec { FailureDetectorMinStdDeviation must be(100 millis) FailureDetectorAcceptableHeartbeatPause must be(3 seconds) SeedNodes must be(Seq.empty[String]) - JoinSeedNodeTimeout must be(5 seconds) + SeedNodeTimeout must be(5 seconds) PeriodicTasksInitialDelay must be(1 seconds) GossipInterval must be(1 second) HeartbeatInterval must be(1 second) LeaderActionsInterval must be(1 second) UnreachableNodesReaperInterval must be(1 second) + JoinTimeout must be(60 seconds) NrOfGossipDaemons must be(4) AutoJoin must be(true) AutoDown must be(true) diff --git a/akka-docs/java/code/docs/dispatcher/DispatcherDocTestBase.java b/akka-docs/java/code/docs/dispatcher/DispatcherDocTestBase.java index 94e4b38121..ca5569657e 100644 --- a/akka-docs/java/code/docs/dispatcher/DispatcherDocTestBase.java +++ b/akka-docs/java/code/docs/dispatcher/DispatcherDocTestBase.java @@ -24,6 +24,15 @@ import com.typesafe.config.Config; //#imports-prio-mailbox +//#imports-custom +import akka.dispatch.Envelope; +import akka.dispatch.MessageQueue; +import akka.dispatch.MailboxType; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +//#imports-custom + import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -136,4 +145,32 @@ public class DispatcherDocTestBase { } } //#prio-mailbox + + //#mailbox-implementation-example + class MyUnboundedMailbox implements MailboxType { + + // This constructor signature must exist, it will be called by Akka + public MyUnboundedMailbox(ActorSystem.Settings settings, Config config) { + // put your initialization code here + } + + // The create method is called to create the MessageQueue + public MessageQueue create(Option owner, Option system) { + return new MessageQueue() { + private final Queue queue = new ConcurrentLinkedQueue(); + + // these must be implemented; queue used as example + public void enqueue(ActorRef receiver, Envelope handle) { queue.offer(handle); } + public Envelope dequeue() { return queue.poll(); } + public int numberOfMessages() { return queue.size(); } + public boolean hasMessages() { return !queue.isEmpty(); } + public void cleanUp(ActorRef owner, MessageQueue deadLetters) { + for (Envelope handle: queue) { + deadLetters.enqueue(owner, handle); + } + } + }; + } + } + //#mailbox-implementation-example } diff --git a/akka-docs/java/dispatchers.rst b/akka-docs/java/dispatchers.rst index 2723883e9c..577740d78c 100644 --- a/akka-docs/java/dispatchers.rst +++ b/akka-docs/java/dispatchers.rst @@ -183,3 +183,46 @@ And then an example on how you would use it: the configuration which describes the dispatcher using this mailbox type; the mailbox type will be instantiated once for each dispatcher using it. +Creating your own Mailbox type +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +An example is worth a thousand quacks: + +.. includecode:: code/docs/dispatcher/DispatcherDocTestBase.java#imports-custom + +.. includecode:: code/docs/dispatcher/DispatcherDocTestBase.java#mailbox-implementation-example + +And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher configuration. + +.. note:: + + Make sure to include a constructor which takes + ``akka.actor.ActorSystem.Settings`` and ``com.typesafe.config.Config`` + arguments, as this constructor is invoked reflectively to construct your + mailbox type. The config passed in as second argument is that section from + the configuration which describes the dispatcher using this mailbox type; the + mailbox type will be instantiated once for each dispatcher using it. + + +Special Semantics of ``system.actorOf`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +In order to make ``system.actorOf`` both synchronous and non-blocking while +keeping the return type :class:`ActorRef` (and the semantics that the returned +ref is fully functional), special handling takes place for this case. Behind +the scenes, a hollow kind of actor reference is constructed, which is sent to +the system’s guardian actor who actually creates the actor and its context and +puts those inside the reference. Until that has happened, messages sent to the +:class:`ActorRef` will be queued locally, and only upon swapping the real +filling in will they be transferred into the real mailbox. Thus, + +.. code-block:: scala + + final Props props = ... + // this actor uses MyCustomMailbox, which is assumed to be a singleton + system.actorOf(props.withDispatcher("myCustomMailbox").tell("bang"); + assert(MyCustomMailbox.getInstance().getLastEnqueued().equals("bang")); + +will probably fail; you will have to allow for some time to pass and retry the +check à la :meth:`TestKit.awaitCond`. + diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index ac911fd216..57dbaa5604 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -82,13 +82,6 @@ that is used in log messages and for identifying actors. The name must not be em or start with ``$``. If the given name is already in use by another child to the same parent actor an `InvalidActorNameException` is thrown. -.. warning:: - - Creating top-level actors with ``system.actorOf`` is a blocking operation, - hence it may dead-lock due to starvation if the default dispatcher is - overloaded. To avoid problems, do not call this method from within actors or - futures which run on the default dispatcher. - Actors are automatically started asynchronously when created. When you create the ``UntypedActor`` then it will automatically call the ``preStart`` callback method on the ``UntypedActor`` class. This is an excellent place to diff --git a/akka-docs/modules/code/docs/actor/mailbox/DurableMailboxDocSpec.scala b/akka-docs/modules/code/docs/actor/mailbox/DurableMailboxDocSpec.scala index b51c7bb170..fc62cd940d 100644 --- a/akka-docs/modules/code/docs/actor/mailbox/DurableMailboxDocSpec.scala +++ b/akka-docs/modules/code/docs/actor/mailbox/DurableMailboxDocSpec.scala @@ -11,7 +11,7 @@ import akka.actor.Props import org.scalatest.{ BeforeAndAfterAll, WordSpec } import org.scalatest.matchers.MustMatchers import akka.testkit.AkkaSpec -import akka.actor.Actor +import akka.actor.{ Actor, ExtendedActorSystem } class MyActor extends Actor { def receive = { @@ -56,20 +56,20 @@ import akka.util.duration._ class MyMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType { - override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new MyMessageQueue(o) + override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = owner zip system headOption match { + case Some((o, s: ExtendedActorSystem)) ⇒ new MyMessageQueue(o, s) case None ⇒ throw new IllegalArgumentException( "requires an owner (i.e. does not work with BalancingDispatcher)") } } -class MyMessageQueue(_owner: ActorContext) - extends DurableMessageQueue(_owner) with DurableMessageSerialization { +class MyMessageQueue(_owner: ActorRef, _system: ExtendedActorSystem) + extends DurableMessageQueue(_owner, _system) with DurableMessageSerialization { val storage = new QueueStorage // A real-world implmentation would use configuration to set the last // three parameters below - val breaker = CircuitBreaker(_owner.system.scheduler, 5, 30.seconds, 1.minute) + val breaker = CircuitBreaker(system.scheduler, 5, 30.seconds, 1.minute) def enqueue(receiver: ActorRef, envelope: Envelope): Unit = breaker.withSyncCircuitBreaker { val data: Array[Byte] = serialize(envelope) @@ -91,7 +91,7 @@ class MyMessageQueue(_owner: ActorContext) * but the purpose of a durable mailbox is to continue * with the same message queue when the actor is started again. */ - def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = () + def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = () } //#custom-mailbox diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 9b2cb9a7e5..47a2318e53 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -76,13 +76,6 @@ that is used in log messages and for identifying actors. The name must not be em or start with ``$``. If the given name is already in use by another child to the same parent actor an `InvalidActorNameException` is thrown. -.. warning:: - - Creating top-level actors with ``system.actorOf`` is a blocking operation, - hence it may dead-lock due to starvation if the default dispatcher is - overloaded. To avoid problems, do not call this method from within actors or - futures which run on the default dispatcher. - Actors are automatically started asynchronously when created. When you create the ``Actor`` then it will automatically call the ``preStart`` callback method on the ``Actor`` trait. This is an excellent place to diff --git a/akka-docs/scala/code/docs/dispatcher/DispatcherDocSpec.scala b/akka-docs/scala/code/docs/dispatcher/DispatcherDocSpec.scala index 3ff8d9c1ea..7fdd0cd9bf 100644 --- a/akka-docs/scala/code/docs/dispatcher/DispatcherDocSpec.scala +++ b/akka-docs/scala/code/docs/dispatcher/DispatcherDocSpec.scala @@ -134,8 +134,8 @@ object DispatcherDocSpec { } //#mailbox-implementation-example - case class MyUnboundedMailbox() extends akka.dispatch.MailboxType { - import akka.actor.ActorContext + class MyUnboundedMailbox extends akka.dispatch.MailboxType { + import akka.actor.{ ActorRef, ActorSystem } import com.typesafe.config.Config import java.util.concurrent.ConcurrentLinkedQueue import akka.dispatch.{ @@ -149,12 +149,12 @@ object DispatcherDocSpec { def this(settings: ActorSystem.Settings, config: Config) = this() // The create method is called to create the MessageQueue - final override def create(owner: Option[ActorContext]): MessageQueue = + final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { final val queue = new ConcurrentLinkedQueue[Envelope]() } - //#mailbox-implementation-example } + //#mailbox-implementation-example } class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) { diff --git a/akka-docs/scala/dispatchers.rst b/akka-docs/scala/dispatchers.rst index cea9ee6e0a..4253d3a1e4 100644 --- a/akka-docs/scala/dispatchers.rst +++ b/akka-docs/scala/dispatchers.rst @@ -198,3 +198,25 @@ And then you just specify the FQCN of your MailboxType as the value of the "mail the configuration which describes the dispatcher using this mailbox type; the mailbox type will be instantiated once for each dispatcher using it. +Special Semantics of ``system.actorOf`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +In order to make ``system.actorOf`` both synchronous and non-blocking while +keeping the return type :class:`ActorRef` (and the semantics that the returned +ref is fully functional), special handling takes place for this case. Behind +the scenes, a hollow kind of actor reference is constructed, which is sent to +the system’s guardian actor who actually creates the actor and its context and +puts those inside the reference. Until that has happened, messages sent to the +:class:`ActorRef` will be queued locally, and only upon swapping the real +filling in will they be transferred into the real mailbox. Thus, + +.. code-block:: scala + + val props: Props = ... + // this actor uses MyCustomMailbox, which is assumed to be a singleton + system.actorOf(props.withDispatcher("myCustomMailbox")) ! "bang" + assert(MyCustomMailbox.instance.getLastEnqueuedMessage == "bang") + +will probably fail; you will have to allow for some time to pass and retry the +check à la :meth:`TestKit.awaitCond`. + diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala index fccb6b5aea..8d2ce5b897 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala @@ -13,20 +13,22 @@ import akka.actor.ActorSystem import akka.dispatch._ import akka.util.{ Duration, NonFatal } import akka.pattern.{ CircuitBreakerOpenException, CircuitBreaker } +import akka.actor.ExtendedActorSystem class FileBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType { private val settings = new FileBasedMailboxSettings(systemSettings, config) - override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new FileBasedMessageQueue(o, settings) - case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") + override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = owner zip system headOption match { + case Some((o, s: ExtendedActorSystem)) ⇒ new FileBasedMessageQueue(o, s, settings) + case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } -class FileBasedMessageQueue(_owner: ActorContext, val settings: FileBasedMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization { +class FileBasedMessageQueue(_owner: ActorRef, _system: ExtendedActorSystem, val settings: FileBasedMailboxSettings) + extends DurableMessageQueue(_owner, _system) with DurableMessageSerialization { // TODO Is it reasonable for all FileBasedMailboxes to have their own logger? private val log = Logging(system, "FileBasedMessageQueue") - val breaker = CircuitBreaker(_owner.system.scheduler, settings.CircuitBreakerMaxFailures, settings.CircuitBreakerCallTimeout, settings.CircuitBreakerResetTimeout) + val breaker = CircuitBreaker(system.scheduler, settings.CircuitBreakerMaxFailures, settings.CircuitBreakerCallTimeout, settings.CircuitBreakerResetTimeout) private val queue = try { (new java.io.File(settings.QueuePath)) match { @@ -79,5 +81,5 @@ class FileBasedMessageQueue(_owner: ActorContext, val settings: FileBasedMailbox case NonFatal(_) ⇒ false } - def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = () + def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = () } diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index b21878d00e..e3bb5858f7 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -13,11 +13,10 @@ private[akka] object DurableExecutableMailboxConfig { val Name = "[\\.\\/\\$\\s]".r } -abstract class DurableMessageQueue(val owner: ActorContext) extends MessageQueue { +abstract class DurableMessageQueue(val owner: ActorRef, val system: ExtendedActorSystem) extends MessageQueue { import DurableExecutableMailboxConfig._ - def system: ExtendedActorSystem = owner.system.asInstanceOf[ExtendedActorSystem] - def ownerPath: ActorPath = owner.self.path + def ownerPath: ActorPath = owner.path val ownerPathString: String = ownerPath.elements.mkString("/") val name: String = "mailbox_" + Name.replaceAllIn(ownerPathString, "_") @@ -42,7 +41,7 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒ val message = MessageSerializer.serialize(system, durableMessage.message.asInstanceOf[AnyRef]) val builder = RemoteMessageProtocol.newBuilder .setMessage(message) - .setRecipient(serializeActorRef(owner.self)) + .setRecipient(serializeActorRef(owner)) .setSender(serializeActorRef(durableMessage.sender)) builder.build.toByteArray @@ -60,7 +59,7 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒ val message = MessageSerializer.deserialize(system, durableMessage.getMessage) val sender = deserializeActorRef(durableMessage.getSender) - Envelope(message, sender)(system) + Envelope(message, sender, system) } } diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala index 9081a5fcb0..8264bd0348 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala @@ -3,25 +3,21 @@ */ package akka.actor.mailbox -import DurableMailboxSpecActorFactory.AccumulatorActor -import DurableMailboxSpecActorFactory.MailboxTestActor -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.ActorSystem -import akka.actor.LocalActorRef -import akka.actor.Props -import akka.actor.actorRef2Scala +import java.io.InputStream +import java.util.concurrent.TimeoutException + +import scala.annotation.tailrec + +import org.scalatest.{ WordSpec, BeforeAndAfterAll } +import org.scalatest.matchers.MustMatchers + +import com.typesafe.config.{ ConfigFactory, Config } + +import DurableMailboxSpecActorFactory.{ MailboxTestActor, AccumulatorActor } +import akka.actor.{ RepointableRef, Props, ActorSystem, ActorRefWithCell, ActorRef, ActorCell, Actor } import akka.dispatch.Mailbox import akka.testkit.TestKit import akka.util.duration.intToDurationInt -import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory -import java.io.InputStream -import java.util.concurrent.TimeoutException -import org.scalatest.BeforeAndAfterAll -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers -import scala.annotation.tailrec object DurableMailboxSpecActorFactory { @@ -115,9 +111,15 @@ abstract class DurableMailboxSpec(system: ActorSystem, val backendName: String) if (!result.contains(words)) throw new Exception("stream did not contain '" + words + "':\n" + result) } - def createMailboxTestActor(props: Props = Props[MailboxTestActor], id: String = ""): ActorRef = id match { - case null | "" ⇒ system.actorOf(props.withDispatcher(backendName + "-dispatcher")) - case some ⇒ system.actorOf(props.withDispatcher(backendName + "-dispatcher"), some) + def createMailboxTestActor(props: Props = Props[MailboxTestActor], id: String = ""): ActorRef = { + val ref = id match { + case null | "" ⇒ system.actorOf(props.withDispatcher(backendName + "-dispatcher")) + case some ⇒ system.actorOf(props.withDispatcher(backendName + "-dispatcher"), some) + } + awaitCond(ref match { + case r: RepointableRef ⇒ r.isStarted + }, 1 second, 10 millis) + ref } private def isDurableMailbox(m: Mailbox): Boolean = @@ -127,9 +129,11 @@ abstract class DurableMailboxSpec(system: ActorSystem, val backendName: String) "get a new, unique, durable mailbox" in { val a1, a2 = createMailboxTestActor() - isDurableMailbox(a1.asInstanceOf[LocalActorRef].underlying.mailbox) must be(true) - isDurableMailbox(a2.asInstanceOf[LocalActorRef].underlying.mailbox) must be(true) - (a1.asInstanceOf[LocalActorRef].underlying.mailbox ne a2.asInstanceOf[LocalActorRef].underlying.mailbox) must be(true) + val mb1 = a1.asInstanceOf[ActorRefWithCell].underlying.asInstanceOf[ActorCell].mailbox + val mb2 = a2.asInstanceOf[ActorRefWithCell].underlying.asInstanceOf[ActorCell].mailbox + isDurableMailbox(mb1) must be(true) + isDurableMailbox(mb2) must be(true) + (mb1 ne mb2) must be(true) } "deliver messages at most once" in { @@ -148,7 +152,7 @@ abstract class DurableMailboxSpec(system: ActorSystem, val backendName: String) "support having multiple actors at the same time" in { val actors = Vector.fill(3)(createMailboxTestActor(Props[AccumulatorActor])) - actors foreach { a ⇒ isDurableMailbox(a.asInstanceOf[LocalActorRef].underlying.mailbox) must be(true) } + actors foreach { a ⇒ isDurableMailbox(a.asInstanceOf[ActorRefWithCell].underlying.asInstanceOf[ActorCell].mailbox) must be(true) } val msgs = 1 to 3 diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index eaecf67792..cdf9ad9d70 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -36,8 +36,8 @@ private[akka] class RemoteActorRefProvider( // these are only available after init() override def rootGuardian: InternalActorRef = local.rootGuardian - override def guardian: InternalActorRef = local.guardian - override def systemGuardian: InternalActorRef = local.systemGuardian + override def guardian: LocalActorRef = local.guardian + override def systemGuardian: LocalActorRef = local.systemGuardian override def terminationFuture: Promise[Unit] = local.terminationFuture override def dispatcher: MessageDispatcher = local.dispatcher override def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit = local.registerTempActor(actorRef, path) @@ -96,8 +96,8 @@ private[akka] class RemoteActorRefProvider( } def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, - systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean): InternalActorRef = { - if (systemService) local.actorOf(system, props, supervisor, path, systemService, deploy, lookupDeploy) + systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean, async: Boolean): InternalActorRef = { + if (systemService) local.actorOf(system, props, supervisor, path, systemService, deploy, lookupDeploy, async) else { /* @@ -155,14 +155,14 @@ private[akka] class RemoteActorRefProvider( Iterator(props.deploy) ++ deployment.iterator reduce ((a, b) ⇒ b withFallback a) match { case d @ Deploy(_, _, _, RemoteScope(addr)) ⇒ if (addr == rootPath.address || addr == transport.address) { - local.actorOf(system, props, supervisor, path, false, deployment.headOption, false) + local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async) } else { val rpath = RootActorPath(addr) / "remote" / transport.address.hostPort / path.elements useActorOnNode(rpath, props, d, supervisor) new RemoteActorRef(this, transport, rpath, supervisor) } - case _ ⇒ local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false) + case _ ⇒ local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async) } } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index ddab54b2ad..53023687c0 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -5,10 +5,11 @@ package akka.remote import scala.annotation.tailrec - import akka.actor.{ VirtualPathContainer, Terminated, Deploy, Props, Nobody, LocalActorRef, InternalActorRef, Address, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor } import akka.event.LoggingAdapter import akka.dispatch.Watch +import akka.actor.ActorRefWithCell +import akka.actor.ActorRefScope private[akka] sealed trait DaemonMsg private[akka] case class DaemonMsgCreate(props: Props, deploy: Deploy, path: String, supervisor: ActorRef) extends DaemonMsg @@ -60,7 +61,7 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath val subpath = elems.drop(1) val path = this.path / subpath val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef], - path, false, Some(deploy), true) + path, systemService = false, Some(deploy), lookupDeploy = true, async = false) addChild(subpath.mkString("/"), actor) this.sendSystemMessage(Watch(actor, this)) case _ ⇒ @@ -68,11 +69,12 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath } } - case Terminated(child: LocalActorRef) ⇒ removeChild(child.path.elements.drop(1).mkString("/")) + case Terminated(child: ActorRefWithCell) if child.asInstanceOf[ActorRefScope].isLocal ⇒ + removeChild(child.path.elements.drop(1).mkString("/")) - case t: Terminated ⇒ + case t: Terminated ⇒ - case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this) + case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this) } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index ecd59c40e0..c48cc430f2 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -270,14 +270,14 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re } case x ⇒ log.warning("remoteDaemon received illegal message {} from {}", x, remoteMessage.sender) } - case l: LocalRef ⇒ + case l @ (_: LocalRef | _: RepointableRef) if l.isLocal ⇒ if (provider.remoteSettings.LogReceive) log.debug("received local message {}", remoteMessage) remoteMessage.payload match { case msg: PossiblyHarmful if useUntrustedMode ⇒ log.warning("operating in UntrustedMode, dropping inbound PossiblyHarmful message of type {}", msg.getClass) case msg: SystemMessage ⇒ l.sendSystemMessage(msg) case msg ⇒ l.!(msg)(remoteMessage.sender) } - case r: RemoteRef ⇒ + case r @ (_: RemoteRef | _: RepointableRef) if !r.isLocal ⇒ if (provider.remoteSettings.LogReceive) log.debug("received remote-destined message {}", remoteMessage) remoteMessage.originalReceiver match { case AddressFromURIString(address) if address == provider.transport.address ⇒ @@ -285,7 +285,7 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re r.!(remoteMessage.payload)(remoteMessage.sender) case r ⇒ log.error("dropping message {} for non-local recipient {} arriving at {} inbound address is {}", remoteMessage.payload, r, address, provider.transport.address) } - case r ⇒ log.error("dropping message {} for non-local recipient {} arriving at {} inbound address is {}", remoteMessage.payload, r, address, provider.transport.address) + case r ⇒ log.error("dropping message {} for unknown recipient {} arriving at {} inbound address is {}", remoteMessage.payload, r, address, provider.transport.address) } } } diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala index 9a71f309fc..a0b7ae4a49 100644 --- a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala +++ b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala @@ -71,7 +71,8 @@ class RemoteRouteeProvider(nodes: Iterable[Address], _context: ActorContext, _re IndexedSeq.empty[ActorRef] ++ (for (i ← 1 to nrOfInstances) yield { val name = "c" + i val deploy = Deploy("", ConfigFactory.empty(), props.routerConfig, RemoteScope(nodeAddressIter.next)) - impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name, false, Some(deploy), false) + impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name, + systemService = false, Some(deploy), lookupDeploy = false, async = false) }) case (_, xs, _) ⇒ throw new ConfigurationException("Remote target.nodes can not be combined with routees for [%s]" diff --git a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala index ac4127fe17..7f92e3089b 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala @@ -118,7 +118,7 @@ akka { val r = expectMsgType[ActorRef] r ! (Props[Echo], "grandchild") val remref = expectMsgType[ActorRef] - remref.isInstanceOf[LocalActorRef] must be(true) + remref.asInstanceOf[ActorRefScope].isLocal must be(true) val myref = system.actorFor(system / "looker" / "child" / "grandchild") myref.isInstanceOf[RemoteActorRef] must be(true) myref ! 43 diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 2fe664d7b6..1732d5faf3 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -128,7 +128,7 @@ class CallingThreadDispatcher( override def id: String = Id - protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(actor, mailboxType) + protected[akka] override def createMailbox(actor: akka.actor.Cell) = new CallingThreadMailbox(actor, mailboxType) protected[akka] override def shutdown() {} @@ -281,17 +281,21 @@ class NestingQueue(val q: MessageQueue) { def isActive = active } -class CallingThreadMailbox(_receiver: ActorCell, val mailboxType: MailboxType) extends Mailbox(_receiver, null) with DefaultSystemMessageQueue { +class CallingThreadMailbox(_receiver: akka.actor.Cell, val mailboxType: MailboxType) + extends Mailbox(null) with DefaultSystemMessageQueue { + + val system = _receiver.system + val self = _receiver.self private val q = new ThreadLocal[NestingQueue]() { override def initialValue = { - val queue = new NestingQueue(mailboxType.create(Some(actor))) - CallingThreadDispatcherQueues(actor.system).registerQueue(CallingThreadMailbox.this, queue) + val queue = new NestingQueue(mailboxType.create(Some(self), Some(system))) + CallingThreadDispatcherQueues(system).registerQueue(CallingThreadMailbox.this, queue) queue } } - override def enqueue(receiver: ActorRef, msg: Envelope): Unit = throw new UnsupportedOperationException("CallingThreadMailbox cannot enqueue normally") + override def enqueue(receiver: ActorRef, msg: Envelope): Unit = q.get.q.enqueue(receiver, msg) override def dequeue(): Envelope = throw new UnsupportedOperationException("CallingThreadMailbox cannot dequeue normally") override def hasMessages: Boolean = q.get.q.hasMessages override def numberOfMessages: Int = 0 @@ -311,7 +315,7 @@ class CallingThreadMailbox(_receiver: ActorCell, val mailboxType: MailboxType) e val q = queue CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, q) super.cleanUp() - q.q.cleanUp(actor, actor.systemImpl.deadLetterQueue) + q.q.cleanUp(actor.self, actor.systemImpl.deadLetterQueue) } } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index f8efe4e2e5..73658cf985 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -56,7 +56,7 @@ class TestActorRef[T <: Actor]( * become/unbecome. */ def receive(o: Any, sender: ActorRef): Unit = try { - underlying.currentMessage = Envelope(o, if (sender eq null) underlying.system.deadLetters else sender)(underlying.system) + underlying.currentMessage = Envelope(o, if (sender eq null) underlying.system.deadLetters else sender, underlying.system) underlying.receiveMessage(o) } finally underlying.currentMessage = null diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index b3c1626536..5f75ba8dfa 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -97,9 +97,14 @@ trait TestKitBase { */ lazy val testActor: ActorRef = { val impl = system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 - impl.systemActorOf(Props(new TestActor(queue)) + val ref = impl.systemActorOf(Props(new TestActor(queue)) .withDispatcher(CallingThreadDispatcher.Id), "testActor" + TestKit.testActorId.incrementAndGet) + awaitCond(ref match { + case r: RepointableRef ⇒ r.isStarted + case _ ⇒ true + }, 1 second, 10 millis) + ref } private var end: Duration = Duration.Undefined diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala index 71b7b185f0..e1b1ba4ddf 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala @@ -190,29 +190,24 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A } @tailrec private def doPoll(mode: PollMsg, togo: Int = 10): Unit = - receiveMessage(mode) match { - case null ⇒ // receiveMessage has already done something special here - case Seq() ⇒ doPollTimeout(mode) - case frames ⇒ - notifyListener(deserializer(frames)) - if (togo > 0) doPoll(mode, togo - 1) - else self ! mode + if (togo <= 0) self ! mode + else receiveMessage(mode) match { + case Seq() ⇒ doPollTimeout(mode) + case frames ⇒ notifyListener(deserializer(frames)); doPoll(mode, togo - 1) } - @tailrec private def receiveMessage(mode: PollMsg, currentFrames: Vector[Frame] = Vector.empty): Seq[Frame] = { - val result = mode match { - case Poll ⇒ socket.recv(JZMQ.NOBLOCK) - case PollCareful ⇒ if (poller.poll(0) > 0) socket.recv(0) else null + @tailrec private def receiveMessage(mode: PollMsg, currentFrames: Vector[Frame] = Vector.empty): Seq[Frame] = + if (mode == PollCareful && (poller.poll(0) <= 0)) { + if (currentFrames.isEmpty) currentFrames else throw new IllegalStateException("Received partial transmission!") + } else { + socket.recv(if (mode == Poll) JZMQ.NOBLOCK else 0) match { + case null ⇒ /*EAGAIN*/ + if (currentFrames.isEmpty) currentFrames else receiveMessage(mode, currentFrames) + case bytes ⇒ + val frames = currentFrames :+ Frame(if (bytes.length == 0) noBytes else bytes) + if (socket.hasReceiveMore) receiveMessage(mode, frames) else frames + } } - result match { - case null ⇒ - if (currentFrames.isEmpty) currentFrames - else throw new IllegalStateException("no more frames available while socket.hasReceivedMore==true") - case bytes ⇒ - val frames = currentFrames :+ Frame(if (bytes.length == 0) noBytes else bytes) - if (socket.hasReceiveMore) receiveMessage(mode, frames) else frames - } - } private val listenerOpt = params collectFirst { case Listener(l) ⇒ l } private def watchListener(): Unit = listenerOpt foreach context.watch diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 4213f65611..584625cc82 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -485,7 +485,7 @@ object Dependency { object V { val Camel = "2.8.0" val Logback = "1.0.4" - val Netty = "3.5.0.Final" + val Netty = "3.5.1.Final" val Protobuf = "2.4.1" val ScalaStm = "0.5" val Scalatest = "1.6.1" diff --git a/repl b/repl new file mode 100644 index 0000000000..701b021b35 --- /dev/null +++ b/repl @@ -0,0 +1,9 @@ +import akka.actor._ +import akka.dispatch.{ Future, Promise } +import com.typesafe.config.ConfigFactory +val config=ConfigFactory.parseString("akka.daemonic=on") +val sys=ActorSystem("repl", config.withFallback(ConfigFactory.load())).asInstanceOf[ExtendedActorSystem] +implicit val ec=sys.dispatcher +import akka.util.duration._ +import akka.util.Timeout +implicit val timeout=Timeout(5 seconds)