Merge branch 'master' into wip-2230-race-in-test-conductor-shutdown-ban

This commit is contained in:
Björn Antonsson 2012-06-25 14:40:12 +02:00
commit e2037e254b
56 changed files with 1319 additions and 490 deletions

View file

@ -227,7 +227,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
contextStackMustBeEmpty
}
filterException[java.lang.IllegalStateException] {
EventFilter[ActorInitializationException](occurrences = 1) intercept {
(intercept[java.lang.IllegalStateException] {
wrap(result
actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept({ throw new IllegalStateException("Ur state be b0rked"); new InnerActor })(result)))))))
@ -257,14 +257,14 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
val in = new ObjectInputStream(new ByteArrayInputStream(bytes))
val readA = in.readObject
a.isInstanceOf[LocalActorRef] must be === true
readA.isInstanceOf[LocalActorRef] must be === true
a.isInstanceOf[ActorRefWithCell] must be === true
readA.isInstanceOf[ActorRefWithCell] must be === true
(readA eq a) must be === true
}
val ser = new JavaSerializer(esys)
val readA = ser.fromBinary(bytes, None)
readA.isInstanceOf[LocalActorRef] must be === true
readA.isInstanceOf[ActorRefWithCell] must be === true
(readA eq a) must be === true
}
@ -369,13 +369,13 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
val timeout = Timeout(20000)
val ref = system.actorOf(Props(new Actor {
def receive = {
case 5 sender.tell("five")
case null sender.tell("null")
case 5 sender.tell("five")
case 0 sender.tell("null")
}
}))
val ffive = (ref.ask(5)(timeout)).mapTo[String]
val fnull = (ref.ask(null)(timeout)).mapTo[String]
val fnull = (ref.ask(0)(timeout)).mapTo[String]
ref ! PoisonPill
Await.result(ffive, timeout.duration) must be("five")

View file

@ -10,6 +10,9 @@ import akka.dispatch.Await
import akka.util.duration._
import scala.collection.JavaConverters
import java.util.concurrent.{ TimeUnit, RejectedExecutionException, CountDownLatch, ConcurrentLinkedQueue }
import akka.pattern.ask
import akka.util.Timeout
import akka.dispatch.Future
class JavaExtensionSpec extends JavaExtension with JUnitSuite
@ -21,8 +24,46 @@ object TestExtension extends ExtensionId[TestExtension] with ExtensionIdProvider
// Dont't place inside ActorSystemSpec object, since it will not be garbage collected and reference to system remains
class TestExtension(val system: ExtendedActorSystem) extends Extension
object ActorSystemSpec {
class Waves extends Actor {
var master: ActorRef = _
var terminaters = Set[ActorRef]()
def receive = {
case n: Int
master = sender
terminaters = Set() ++ (for (i 1 to n) yield {
val man = context.watch(context.system.actorOf(Props[Terminater]))
man ! "run"
man
})
case Terminated(child) if terminaters contains child
terminaters -= child
if (terminaters.isEmpty) {
master ! "done"
context stop self
}
}
override def preRestart(cause: Throwable, msg: Option[Any]) {
if (master ne null) {
master ! "failed with " + cause + " while processing " + msg
}
context stop self
}
}
class Terminater extends Actor {
def receive = {
case "run" context.stop(self)
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExtension$"]""") {
class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExtension$"]""") with ImplicitSender {
"An ActorSystem" must {
@ -112,6 +153,35 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt
}.getMessage must be("Must be called prior to system shutdown.")
}
"reliably create waves of actors" in {
import system.dispatcher
implicit val timeout = Timeout(30 seconds)
val waves = for (i 1 to 3) yield system.actorOf(Props[ActorSystemSpec.Waves]) ? 50000
Await.result(Future.sequence(waves), timeout.duration + 5.seconds) must be === Seq("done", "done", "done")
}
"reliable deny creation of actors while shutting down" in {
val system = ActorSystem()
system.scheduler.scheduleOnce(200 millis) { system.shutdown() }
var failing = false
var created = Vector.empty[ActorRef]
while (!system.isTerminated && system.uptime < 5) {
try {
val t = system.actorOf(Props[ActorSystemSpec.Terminater])
failing must not be true // because once failing => always failing (its due to shutdown)
created :+= t
} catch {
case _: IllegalStateException failing = true
}
}
if (system.uptime >= 5) {
println(created.last)
println(system.asInstanceOf[ExtendedActorSystem].printTree)
system.uptime must be < 5L
}
created filter (ref !ref.isTerminated && !ref.asInstanceOf[ActorRefWithCell].underlying.isInstanceOf[UnstartedCell]) must be(Seq())
}
}
}
}

View file

@ -143,6 +143,26 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
result must be(Seq(1, 2, 3))
}
}
"be able to watch a child with the same name after the old died" in {
val parent = system.actorOf(Props(new Actor {
def receive = {
case "NKOTB"
val currentKid = context.watch(context.actorOf(Props(ctx { case "NKOTB" ctx stop ctx.self }), "kid"))
currentKid forward "NKOTB"
context become {
case Terminated(`currentKid`)
testActor ! "GREEN"
context unbecome
}
}
}))
parent ! "NKOTB"
expectMsg("GREEN")
parent ! "NKOTB"
expectMsg("GREEN")
}
}
}

View file

@ -140,13 +140,13 @@ class FSMTimingSpec extends AkkaSpec with ImplicitSender {
object FSMTimingSpec {
def suspend(actorRef: ActorRef): Unit = actorRef match {
case l: LocalActorRef l.suspend()
case _
case l: ActorRefWithCell l.suspend()
case _
}
def resume(actorRef: ActorRef): Unit = actorRef match {
case l: LocalActorRef l.resume()
case _
case l: ActorRefWithCell l.resume()
case _
}
trait State

View file

@ -3,24 +3,23 @@
*/
package akka.actor.dispatch
import org.scalatest.Assertions._
import akka.testkit._
import akka.dispatch._
import akka.util.Timeout
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch, TimeUnit }
import akka.util.Switch
import java.rmi.RemoteException
import org.junit.{ After, Test }
import akka.actor._
import util.control.NoStackTrace
import akka.actor.ActorSystem
import akka.util.duration._
import akka.event.Logging.Error
import java.util.concurrent.{ TimeUnit, CountDownLatch, ConcurrentHashMap }
import java.util.concurrent.atomic.{ AtomicLong, AtomicInteger }
import org.junit.runner.RunWith
import org.scalatest.Assertions.{ fail, assert }
import org.scalatest.junit.JUnitRunner
import com.typesafe.config.Config
import akka.util.Duration
import akka.actor._
import akka.dispatch._
import akka.event.Logging.Error
import akka.pattern.ask
import akka.testkit._
import akka.util.{ Timeout, Switch, Duration }
import akka.util.duration._
object ActorModelSpec {
@ -201,7 +200,7 @@ object ActorModelSpec {
msgsReceived: Long = statsFor(actorRef, dispatcher).msgsReceived.get(),
msgsProcessed: Long = statsFor(actorRef, dispatcher).msgsProcessed.get(),
restarts: Long = statsFor(actorRef, dispatcher).restarts.get())(implicit system: ActorSystem) {
val stats = statsFor(actorRef, Option(dispatcher).getOrElse(actorRef.asInstanceOf[LocalActorRef].underlying.dispatcher))
val stats = statsFor(actorRef, Option(dispatcher).getOrElse(actorRef.asInstanceOf[ActorRefWithCell].underlying.asInstanceOf[ActorCell].dispatcher))
val deadline = System.currentTimeMillis + 1000
try {
await(deadline)(stats.suspensions.get() == suspensions)
@ -241,6 +240,13 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
def newTestActor(dispatcher: String) = system.actorOf(Props[DispatcherActor].withDispatcher(dispatcher))
def awaitStarted(ref: ActorRef): Unit = {
awaitCond(ref match {
case r: RepointableRef r.isStarted
case _ true
}, 1 second, 10 millis)
}
protected def interceptedDispatcher(): MessageDispatcherInterceptor
protected def dispatcherType: String
@ -280,6 +286,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
implicit val dispatcher = interceptedDispatcher()
val start, oneAtATime = new CountDownLatch(1)
val a = newTestActor(dispatcher.id)
awaitStarted(a)
a ! CountDown(start)
assertCountDown(start, 3.seconds.dilated.toMillis, "Should process first message within 3 seconds")
@ -328,7 +335,8 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
"not process messages for a suspended actor" in {
implicit val dispatcher = interceptedDispatcher()
val a = newTestActor(dispatcher.id).asInstanceOf[LocalActorRef]
val a = newTestActor(dispatcher.id).asInstanceOf[InternalActorRef]
awaitStarted(a)
val done = new CountDownLatch(1)
a.suspend
a ! CountDown(done)
@ -436,6 +444,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
"not double-deregister" in {
implicit val dispatcher = interceptedDispatcher()
for (i 1 to 1000) system.actorOf(Props.empty)
val a = newTestActor(dispatcher.id)
a ! DoubleStop
awaitCond(statsFor(a, dispatcher).registers.get == 1)

View file

@ -1,8 +1,12 @@
package akka.actor.dispatch
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.dispatch.{ Mailbox, Dispatchers }
import akka.actor.{ LocalActorRef, IllegalActorStateException, Actor, Props }
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import akka.actor.{ Props, ActorRefWithCell, ActorCell, Actor }
import akka.dispatch.Mailbox
import akka.testkit.AkkaSpec
object BalancingDispatcherSpec {
@ -51,8 +55,8 @@ class BalancingDispatcherSpec extends AkkaSpec(BalancingDispatcherSpec.config) {
"have fast actor stealing work from slow actor" in {
val finishedCounter = new CountDownLatch(110)
val slow = system.actorOf(Props(new DelayableActor(50, finishedCounter)).withDispatcher(delayableActorDispatcher)).asInstanceOf[LocalActorRef]
val fast = system.actorOf(Props(new DelayableActor(10, finishedCounter)).withDispatcher(delayableActorDispatcher)).asInstanceOf[LocalActorRef]
val slow = system.actorOf(Props(new DelayableActor(50, finishedCounter)).withDispatcher(delayableActorDispatcher)).asInstanceOf[ActorRefWithCell]
val fast = system.actorOf(Props(new DelayableActor(10, finishedCounter)).withDispatcher(delayableActorDispatcher)).asInstanceOf[ActorRefWithCell]
var sentToFast = 0
@ -76,11 +80,11 @@ class BalancingDispatcherSpec extends AkkaSpec(BalancingDispatcherSpec.config) {
}
finishedCounter.await(5, TimeUnit.SECONDS)
fast.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false)
slow.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false)
fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > sentToFast
fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be >
(slow.underlying.actor.asInstanceOf[DelayableActor].invocationCount)
fast.underlying.asInstanceOf[ActorCell].mailbox.asInstanceOf[Mailbox].hasMessages must be(false)
slow.underlying.asInstanceOf[ActorCell].mailbox.asInstanceOf[Mailbox].hasMessages must be(false)
fast.underlying.asInstanceOf[ActorCell].actor.asInstanceOf[DelayableActor].invocationCount must be > sentToFast
fast.underlying.asInstanceOf[ActorCell].actor.asInstanceOf[DelayableActor].invocationCount must be >
(slow.underlying.asInstanceOf[ActorCell].actor.asInstanceOf[DelayableActor].invocationCount)
system.stop(slow)
system.stop(fast)
}

View file

@ -1,13 +1,17 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import java.util.concurrent.{ TimeUnit, BlockingQueue }
import java.util.concurrent.ConcurrentLinkedQueue
import akka.util._
import akka.util.duration._
import akka.testkit.AkkaSpec
import java.util.concurrent.{ ConcurrentLinkedQueue, BlockingQueue }
import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll }
import com.typesafe.config.Config
import akka.actor._
import akka.actor.{ RepointableRef, Props, DeadLetter, ActorSystem, ActorRefWithCell, ActorRef, ActorCell }
import akka.testkit.AkkaSpec
import akka.util.duration.intToDurationInt
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach {
@ -75,7 +79,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
result
}
def createMessageInvocation(msg: Any): Envelope = Envelope(msg, system.deadLetters)(system)
def createMessageInvocation(msg: Any): Envelope = Envelope(msg, system.deadLetters, system)
def ensureInitialMailboxState(config: MailboxType, q: MessageQueue) {
q must not be null
@ -136,8 +140,8 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
class DefaultMailboxSpec extends MailboxSpec {
lazy val name = "The default mailbox implementation"
def factory = {
case u: UnboundedMailbox u.create(None)
case b: BoundedMailbox b.create(None)
case u: UnboundedMailbox u.create(None, None)
case b: BoundedMailbox b.create(None, None)
}
}
@ -145,8 +149,8 @@ class PriorityMailboxSpec extends MailboxSpec {
val comparator = PriorityGenerator(_.##)
lazy val name = "The priority mailbox implementation"
def factory = {
case UnboundedMailbox() new UnboundedPriorityMailbox(comparator).create(None)
case BoundedMailbox(capacity, pushTimeOut) new BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(None)
case UnboundedMailbox() new UnboundedPriorityMailbox(comparator).create(None, None)
case BoundedMailbox(capacity, pushTimeOut) new BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(None, None)
}
}
@ -158,13 +162,13 @@ object CustomMailboxSpec {
"""
class MyMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType {
override def create(owner: Option[ActorContext]) = owner match {
override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = owner match {
case Some(o) new MyMailbox(o)
case None throw new Exception("no mailbox owner given")
}
}
class MyMailbox(owner: ActorContext) extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
class MyMailbox(owner: ActorRef) extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final val queue = new ConcurrentLinkedQueue[Envelope]()
}
}
@ -174,7 +178,11 @@ class CustomMailboxSpec extends AkkaSpec(CustomMailboxSpec.config) {
"Dispatcher configuration" must {
"support custom mailboxType" in {
val actor = system.actorOf(Props.empty.withDispatcher("my-dispatcher"))
val queue = actor.asInstanceOf[LocalActorRef].underlying.mailbox.messageQueue
awaitCond(actor match {
case r: RepointableRef r.isStarted
case _ true
}, 1 second, 10 millis)
val queue = actor.asInstanceOf[ActorRefWithCell].underlying.asInstanceOf[ActorCell].mailbox.messageQueue
queue.getClass must be(classOf[CustomMailboxSpec.MyMailbox])
}
}

View file

@ -1,12 +1,14 @@
package akka.dispatch
import akka.actor.{ Props, LocalActorRef, Actor }
import akka.testkit.AkkaSpec
import akka.pattern.ask
import akka.util.duration._
import akka.testkit.DefaultTimeout
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import com.typesafe.config.Config
import akka.actor.ActorSystem
import akka.actor.{ Props, InternalActorRef, ActorSystem, Actor }
import akka.pattern.ask
import akka.testkit.{ DefaultTimeout, AkkaSpec }
import akka.util.duration.intToDurationInt
object PriorityDispatcherSpec {
val config = """
@ -54,7 +56,7 @@ class PriorityDispatcherSpec extends AkkaSpec(PriorityDispatcherSpec.config) wit
case i: Int acc = i :: acc
case 'Result sender.tell(acc)
}
}).withDispatcher(dispatcherKey)).asInstanceOf[LocalActorRef]
}).withDispatcher(dispatcherKey)).asInstanceOf[InternalActorRef]
actor.suspend //Make sure the actor isn't treating any messages, let it buffer the incoming messages

View file

@ -4,15 +4,14 @@
package akka.routing
import java.util.concurrent.atomic.AtomicInteger
import org.junit.runner.RunWith
import akka.actor.{ Props, LocalActorRef, Deploy, Actor, ActorRef }
import akka.actor.{ Props, Deploy, Actor, ActorRef }
import akka.ConfigurationException
import akka.dispatch.Await
import akka.pattern.{ ask, gracefulStop }
import akka.testkit.{ TestLatch, ImplicitSender, DefaultTimeout, AkkaSpec }
import akka.util.duration.intToDurationInt
import akka.actor.UnstartedCell
object ConfiguredLocalRoutingSpec {
val config = """
@ -47,6 +46,14 @@ object ConfiguredLocalRoutingSpec {
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.config) with DefaultTimeout with ImplicitSender {
def routerConfig(ref: ActorRef): RouterConfig = ref match {
case r: RoutedActorRef
r.underlying match {
case c: RoutedActorCell c.routerConfig
case _: UnstartedCell awaitCond(r.isStarted, 1 second, 10 millis); routerConfig(ref)
}
}
"RouterConfig" must {
"be picked up from Props" in {
@ -55,7 +62,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con
case "get" sender ! context.props
}
}).withRouter(RoundRobinRouter(12)), "someOther")
actor.asInstanceOf[LocalActorRef].underlying.props.routerConfig must be === RoundRobinRouter(12)
routerConfig(actor) must be === RoundRobinRouter(12)
Await.result(gracefulStop(actor, 3 seconds), 3 seconds)
}
@ -65,7 +72,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con
case "get" sender ! context.props
}
}).withRouter(RoundRobinRouter(12)), "config")
actor.asInstanceOf[LocalActorRef].underlying.props.routerConfig must be === RandomRouter(4)
routerConfig(actor) must be === RandomRouter(4)
Await.result(gracefulStop(actor, 3 seconds), 3 seconds)
}
@ -75,7 +82,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con
case "get" sender ! context.props
}
}).withRouter(FromConfig).withDeploy(Deploy(routerConfig = RoundRobinRouter(12))), "someOther")
actor.asInstanceOf[LocalActorRef].underlying.props.routerConfig must be === RoundRobinRouter(12)
routerConfig(actor) must be === RoundRobinRouter(12)
Await.result(gracefulStop(actor, 3 seconds), 3 seconds)
}
@ -85,7 +92,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con
case "get" sender ! context.props
}
}).withRouter(FromConfig).withDeploy(Deploy(routerConfig = RoundRobinRouter(12))), "config")
actor.asInstanceOf[LocalActorRef].underlying.props.routerConfig must be === RandomRouter(4)
routerConfig(actor) must be === RandomRouter(4)
Await.result(gracefulStop(actor, 3 seconds), 3 seconds)
}

View file

@ -12,10 +12,11 @@ import akka.dispatch.Await
import akka.util.Duration
import akka.ConfigurationException
import com.typesafe.config.ConfigFactory
import akka.pattern.ask
import akka.pattern.{ ask, pipe }
import java.util.concurrent.ConcurrentHashMap
import com.typesafe.config.Config
import akka.dispatch.Dispatchers
import akka.util.Timeout
object RoutingSpec {
@ -171,6 +172,18 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
expectMsg("restarted")
}
"must start in-line for context.actorOf()" in {
system.actorOf(Props(new Actor {
def receive = {
case "start"
context.actorOf(Props(new Actor {
def receive = { case x sender ! x }
}).withRouter(RoundRobinRouter(2))) ? "hello" pipeTo sender
}
})) ! "start"
expectMsg("hello")
}
}
"no router" must {
@ -528,7 +541,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
}
}
"support custom router" in {
val myrouter = system.actorOf(Props().withRouter(FromConfig), "myrouter")
val myrouter = system.actorOf(Props.empty.withRouter(FromConfig), "myrouter")
myrouter.isTerminated must be(false)
}
}
@ -540,7 +553,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
}
"count votes as intended - not as in Florida" in {
val routedActor = system.actorOf(Props().withRouter(VoteCountRouter()))
val routedActor = system.actorOf(Props.empty.withRouter(VoteCountRouter()))
routedActor ! DemocratVote
routedActor ! DemocratVote
routedActor ! RepublicanVote

View file

@ -8,10 +8,14 @@ import akka.util.Unsafe;
final class AbstractActorCell {
final static long mailboxOffset;
final static long childrenOffset;
final static long nextNameOffset;
static {
try {
mailboxOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("_mailboxDoNotCallMeDirectly"));
childrenOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("_childrenRefsDoNotCallMeDirectly"));
nextNameOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("_nextNameDoNotCallMeDirectly"));
} catch(Throwable t){
throw new ExceptionInInitializerError(t);
}

View file

@ -0,0 +1,19 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor;
import akka.util.Unsafe;
final class AbstractActorRef {
final static long cellOffset;
static {
try {
cellOffset = Unsafe.instance.objectFieldOffset(RepointableActorRef.class.getDeclaredField("_cellDoNotCallMeDirectly"));
} catch(Throwable t){
throw new ExceptionInInitializerError(t);
}
}
}

View file

@ -58,7 +58,7 @@ case object Kill extends Kill {
/**
* When Death Watch is used, the watcher will receive a Terminated(watched) message when watched is terminated.
*/
case class Terminated(@BeanProperty actor: ActorRef)(@BeanProperty val existenceConfirmed: Boolean)
case class Terminated(@BeanProperty actor: ActorRef)(@BeanProperty val existenceConfirmed: Boolean) extends AutoReceivedMessage
abstract class ReceiveTimeout extends PossiblyHarmful
@ -134,8 +134,7 @@ class ActorInitializationException private[akka] (actor: ActorRef, message: Stri
* there might be more of them in the future, or not.
*/
class InvalidMessageException private[akka] (message: String, cause: Throwable = null)
extends AkkaException(message, cause)
with NoStackTrace {
extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null)
}

View file

@ -15,6 +15,7 @@ import akka.serialization.SerializationExtension
import akka.event.Logging.LogEventException
import collection.immutable.{ TreeSet, TreeMap }
import akka.util.{ Unsafe, Duration, Helpers, NonFatal }
import java.util.concurrent.atomic.AtomicLong
//TODO: everything here for current compatibility - could be limited more
@ -167,6 +168,78 @@ trait UntypedActorContext extends ActorContext {
}
/**
* INTERNAL API
*/
private[akka] trait Cell {
/**
* The self reference which this Cell is attached to.
*/
def self: ActorRef
/**
* The system within which this Cell lives.
*/
def system: ActorSystem
/**
* The system internals where this Cell lives.
*/
def systemImpl: ActorSystemImpl
/**
* Recursively suspend this actor and all its children.
*/
def suspend(): Unit
/**
* Recursively resume this actor and all its children.
*/
def resume(): Unit
/**
* Restart this actor (will recursively restart or stop all children).
*/
def restart(cause: Throwable): Unit
/**
* Recursively terminate this actor and all its children.
*/
def stop(): Unit
/**
* Returns true if the actor is locally known to be terminated, false if
* alive or uncertain.
*/
def isTerminated: Boolean
/**
* The supervisor of this actor.
*/
def parent: InternalActorRef
/**
* All children of this actor, including only reserved-names.
*/
def childrenRefs: ActorCell.ChildrenContainer
/**
* Enqueue a message to be sent to the actor; may or may not actually
* schedule the actor to run, depending on which type of cell it is.
*/
def tell(message: Any, sender: ActorRef): Unit
/**
* Enqueue a message to be sent to the actor; may or may not actually
* schedule the actor to run, depending on which type of cell it is.
*/
def sendSystemMessage(msg: SystemMessage): Unit
/**
* Returns true if the actor is local, i.e. if it is actually scheduled
* on a Thread in the current JVM when run.
*/
def isLocal: Boolean
/**
* If the actor isLocal, returns whether messages are currently queued,
* false otherwise.
*/
def hasMessages: Boolean
/**
* If the actor isLocal, returns the number of messages currently queued,
* which may be a costly operation, 0 otherwise.
*/
def numberOfMessages: Int
}
/**
* Everything in here is completely Akka PRIVATE. You will not find any
* supported APIs in this place. This is not the API you were looking
@ -201,10 +274,18 @@ private[akka] object ActorCell {
def children: Iterable[ActorRef]
def stats: Iterable[ChildRestartStats]
def shallDie(actor: ActorRef): ChildrenContainer
/**
* reserve that name or throw an exception
*/
def reserve(name: String): ChildrenContainer
/**
* cancel a reservation
*/
def unreserve(name: String): ChildrenContainer
}
trait EmptyChildrenContainer extends ChildrenContainer {
val emptyStats = TreeMap.empty[String, ChildRestartStats]
val emptyStats = TreeMap.empty[String, ChildStats]
def add(child: ActorRef): ChildrenContainer =
new NormalChildrenContainer(emptyStats.updated(child.path.name, ChildRestartStats(child)))
def remove(child: ActorRef): ChildrenContainer = this
@ -213,6 +294,8 @@ private[akka] object ActorCell {
def children: Iterable[ActorRef] = Nil
def stats: Iterable[ChildRestartStats] = Nil
def shallDie(actor: ActorRef): ChildrenContainer = this
def reserve(name: String): ChildrenContainer = new NormalChildrenContainer(emptyStats.updated(name, ChildNameReserved))
def unreserve(name: String): ChildrenContainer = this
override def toString = "no children"
}
@ -228,6 +311,8 @@ private[akka] object ActorCell {
*/
object TerminatedChildrenContainer extends EmptyChildrenContainer {
override def add(child: ActorRef): ChildrenContainer = this
override def reserve(name: String): ChildrenContainer =
throw new IllegalStateException("cannot reserve actor name '" + name + "': already terminated")
}
/**
@ -236,32 +321,46 @@ private[akka] object ActorCell {
* calling context.stop(child) and processing the ChildTerminated() system
* message).
*/
class NormalChildrenContainer(c: TreeMap[String, ChildRestartStats]) extends ChildrenContainer {
class NormalChildrenContainer(c: TreeMap[String, ChildStats]) extends ChildrenContainer {
def add(child: ActorRef): ChildrenContainer = new NormalChildrenContainer(c.updated(child.path.name, ChildRestartStats(child)))
def add(child: ActorRef): ChildrenContainer =
new NormalChildrenContainer(c.updated(child.path.name, ChildRestartStats(child)))
def remove(child: ActorRef): ChildrenContainer = NormalChildrenContainer(c - child.path.name)
def getByName(name: String): Option[ChildRestartStats] = c get name
def getByRef(actor: ActorRef): Option[ChildRestartStats] = c get actor.path.name match {
case c @ Some(crs) if (crs.child == actor) c
case _ None
def getByName(name: String): Option[ChildRestartStats] = c.get(name) match {
case s @ Some(_: ChildRestartStats) s.asInstanceOf[Option[ChildRestartStats]]
case _ None
}
def children: Iterable[ActorRef] = c.values.view.map(_.child)
def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match {
case c @ Some(crs: ChildRestartStats) if (crs.child == actor) c.asInstanceOf[Option[ChildRestartStats]]
case _ None
}
def stats: Iterable[ChildRestartStats] = c.values
def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) child }
def stats: Iterable[ChildRestartStats] = c.values.collect { case c: ChildRestartStats c }
def shallDie(actor: ActorRef): ChildrenContainer = TerminatingChildrenContainer(c, Set(actor), UserRequest)
def reserve(name: String): ChildrenContainer =
if (c contains name)
throw new InvalidActorNameException("actor name " + name + " is not unique!")
else new NormalChildrenContainer(c.updated(name, ChildNameReserved))
def unreserve(name: String): ChildrenContainer = c.get(name) match {
case Some(ChildNameReserved) NormalChildrenContainer(c - name)
case _ this
}
override def toString =
if (c.size > 20) c.size + " children"
else c.mkString("children:\n ", "\n ", "")
}
object NormalChildrenContainer {
def apply(c: TreeMap[String, ChildRestartStats]): ChildrenContainer =
def apply(c: TreeMap[String, ChildStats]): ChildrenContainer =
if (c.isEmpty) EmptyChildrenContainer
else new NormalChildrenContainer(c)
}
@ -276,7 +375,7 @@ private[akka] object ActorCell {
* type of container, depending on whether or not children are left and whether or not
* the reason was Terminating.
*/
case class TerminatingChildrenContainer(c: TreeMap[String, ChildRestartStats], toDie: Set[ActorRef], reason: SuspendReason)
case class TerminatingChildrenContainer(c: TreeMap[String, ChildStats], toDie: Set[ActorRef], reason: SuspendReason)
extends ChildrenContainer {
def add(child: ActorRef): ChildrenContainer = copy(c.updated(child.path.name, ChildRestartStats(child)))
@ -290,19 +389,35 @@ private[akka] object ActorCell {
else copy(c - child.path.name, t)
}
def getByName(name: String): Option[ChildRestartStats] = c get name
def getByRef(actor: ActorRef): Option[ChildRestartStats] = c get actor.path.name match {
case c @ Some(crs) if (crs.child == actor) c
case _ None
def getByName(name: String): Option[ChildRestartStats] = c.get(name) match {
case s @ Some(_: ChildRestartStats) s.asInstanceOf[Option[ChildRestartStats]]
case _ None
}
def children: Iterable[ActorRef] = c.values.view.map(_.child)
def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match {
case c @ Some(crs: ChildRestartStats) if (crs.child == actor) c.asInstanceOf[Option[ChildRestartStats]]
case _ None
}
def stats: Iterable[ChildRestartStats] = c.values
def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) child }
def stats: Iterable[ChildRestartStats] = c.values.collect { case c: ChildRestartStats c }
def shallDie(actor: ActorRef): ChildrenContainer = copy(toDie = toDie + actor)
def reserve(name: String): ChildrenContainer = reason match {
case Termination throw new IllegalStateException("cannot reserve actor name '" + name + "': terminating")
case _
if (c contains name)
throw new InvalidActorNameException("actor name " + name + " is not unique!")
else copy(c = c.updated(name, ChildNameReserved))
}
def unreserve(name: String): ChildrenContainer = c.get(name) match {
case Some(ChildNameReserved) copy(c = c - name)
case _ this
}
override def toString =
if (c.size > 20) c.size + " children"
else c.mkString("children (" + toDie.size + " terminating):\n ", "\n ", "\n") + toDie
@ -316,10 +431,13 @@ private[akka] class ActorCell(
val system: ActorSystemImpl,
val self: InternalActorRef,
val props: Props,
@volatile var parent: InternalActorRef) extends UntypedActorContext {
import AbstractActorCell.mailboxOffset
@volatile var parent: InternalActorRef) extends UntypedActorContext with Cell {
import AbstractActorCell.{ mailboxOffset, childrenOffset, nextNameOffset }
import ActorCell._
final def isLocal = true
final def systemImpl = system
protected final def guardian = self
@ -353,7 +471,46 @@ private[akka] class ActorCell(
var receiveTimeoutData: (Long, Cancellable) = emptyReceiveTimeoutData
@volatile
var childrenRefs: ChildrenContainer = EmptyChildrenContainer
private var _childrenRefsDoNotCallMeDirectly: ChildrenContainer = EmptyChildrenContainer
def childrenRefs: ChildrenContainer = Unsafe.instance.getObjectVolatile(this, childrenOffset).asInstanceOf[ChildrenContainer]
private def swapChildrenRefs(oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean =
Unsafe.instance.compareAndSwapObject(this, childrenOffset, oldChildren, newChildren)
@tailrec private def reserveChild(name: String): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.reserve(name)) || reserveChild(name)
}
@tailrec private def unreserveChild(name: String): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.unreserve(name)) || unreserveChild(name)
}
@tailrec private def addChild(ref: ActorRef): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.add(ref)) || addChild(ref)
}
@tailrec private def shallDie(ref: ActorRef): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.shallDie(ref)) || shallDie(ref)
}
@tailrec private def removeChild(ref: ActorRef): ChildrenContainer = {
val c = childrenRefs
val n = c.remove(ref)
if (swapChildrenRefs(c, n)) n
else removeChild(ref)
}
@tailrec private def setChildrenTerminationReason(reason: SuspendReason): Boolean = {
childrenRefs match {
case c: TerminatingChildrenContainer swapChildrenRefs(c, c.copy(reason = reason)) || setChildrenTerminationReason(reason)
case _ false
}
}
private def isTerminating = childrenRefs match {
case TerminatingChildrenContainer(_, _, Termination) true
@ -365,7 +522,7 @@ private[akka] class ActorCell(
case _ true
}
private def _actorOf(props: Props, name: String): ActorRef = {
private def _actorOf(props: Props, name: String, async: Boolean): ActorRef = {
if (system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) {
val ser = SerializationExtension(system)
ser.serialize(props.creator) match {
@ -376,53 +533,74 @@ private[akka] class ActorCell(
}
}
}
// in case we are currently terminating, swallow creation requests and return EmptyLocalActorRef
if (isTerminating) provider.actorFor(self, Seq(name))
/*
* in case we are currently terminating, fail external attachChild requests
* (internal calls cannot happen anyway because we are suspended)
*/
if (isTerminating) throw new IllegalStateException("cannot create children while terminating or terminated")
else {
val actor = provider.actorOf(systemImpl, props, self, self.path / name, false, None, true)
childrenRefs = childrenRefs.add(actor)
reserveChild(name)
// this name will either be unreserved or overwritten with a real child below
val actor =
try {
provider.actorOf(systemImpl, props, self, self.path / name,
systemService = false, deploy = None, lookupDeploy = true, async = async)
} catch {
case NonFatal(e)
unreserveChild(name)
throw e
}
addChild(actor)
actor
}
}
def actorOf(props: Props): ActorRef = _actorOf(props, randomName())
def actorOf(props: Props): ActorRef = _actorOf(props, randomName(), async = false)
def actorOf(props: Props, name: String): ActorRef = {
def actorOf(props: Props, name: String): ActorRef = _actorOf(props, checkName(name), async = false)
private def checkName(name: String): String = {
import ActorPath.ElementRegex
name match {
case null throw new InvalidActorNameException("actor name must not be null")
case "" throw new InvalidActorNameException("actor name must not be empty")
case ElementRegex() // this is fine
case ElementRegex() name
case _ throw new InvalidActorNameException("illegal actor name '" + name + "', must conform to " + ElementRegex)
}
childrenRefs.getByName(name) match {
case None _actorOf(props, name)
case _ throw new InvalidActorNameException("actor name " + name + " is not unique!")
}
}
private[akka] def attachChild(props: Props, name: String): ActorRef =
_actorOf(props, checkName(name), async = true)
private[akka] def attachChild(props: Props): ActorRef =
_actorOf(props, randomName(), async = true)
final def stop(actor: ActorRef): Unit = {
if (childrenRefs.getByRef(actor).isDefined) childrenRefs = childrenRefs.shallDie(actor)
val started = actor match {
case r: RepointableRef r.isStarted
case _ true
}
if (childrenRefs.getByRef(actor).isDefined && started) shallDie(actor)
actor.asInstanceOf[InternalActorRef].stop()
}
var currentMessage: Envelope = _
var actor: Actor = _
private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack
@volatile var _mailboxDoNotCallMeDirectly: Mailbox = _ //This must be volatile since it isn't protected by the mailbox status
var nextNameSequence: Long = 0
var watching: Set[ActorRef] = emptyActorRefSet
var watchedBy: Set[ActorRef] = emptyActorRefSet
//Not thread safe, so should only be used inside the actor that inhabits this ActorCell
@volatile private var _nextNameDoNotCallMeDirectly = 0L
final protected def randomName(): String = {
val n = nextNameSequence
nextNameSequence = n + 1
Helpers.base64(n)
@tailrec def inc(): Long = {
val current = Unsafe.instance.getLongVolatile(this, nextNameOffset)
if (Unsafe.instance.compareAndSwapLong(this, nextNameOffset, current, current + 1)) current
else inc()
}
Helpers.base64(inc())
}
@inline
final val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher)
@volatile private var _mailboxDoNotCallMeDirectly: Mailbox = _ //This must be volatile since it isn't protected by the mailbox status
/**
* INTERNAL API
@ -442,6 +620,12 @@ private[akka] class ActorCell(
else oldMailbox
}
final def hasMessages: Boolean = mailbox.hasMessages
final def numberOfMessages: Int = mailbox.numberOfMessages
val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher)
/**
* UntypedActorContext impl
*/
@ -449,20 +633,22 @@ private[akka] class ActorCell(
final def isTerminated: Boolean = mailbox.isClosed
final def start(): Unit = {
final def start(): this.type = {
/*
* Create the mailbox and enqueue the Create() message to ensure that
* this is processed before anything else.
*/
swapMailbox(dispatcher.createMailbox(this))
mailbox.setActor(this)
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
mailbox.systemEnqueue(self, Create())
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(akka.dispatch.Supervise(self))
// This call is expected to start off the actor by scheduling its mailbox.
dispatcher.attach(this)
this
}
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
@ -500,8 +686,10 @@ private[akka] class ActorCell(
final def getChildren(): java.lang.Iterable[ActorRef] =
scala.collection.JavaConverters.asJavaIterableConverter(children).asJava
final def tell(message: Any, sender: ActorRef): Unit =
dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender)(system))
def tell(message: Any, sender: ActorRef): Unit =
dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system))
override def sendSystemMessage(message: SystemMessage): Unit = dispatcher.systemDispatch(this, message)
final def sender: ActorRef = currentMessage match {
case null system.deadLetters
@ -564,7 +752,7 @@ private[akka] class ActorCell(
}
childrenRefs match {
case ct: TerminatingChildrenContainer
childrenRefs = ct.copy(reason = Recreation(cause))
setChildrenTerminationReason(Recreation(cause))
dispatcher suspend this
case _
doRecreate(cause, failedActor)
@ -622,7 +810,7 @@ private[akka] class ActorCell(
childrenRefs match {
case ct: TerminatingChildrenContainer
childrenRefs = ct.copy(reason = Termination)
setChildrenTerminationReason(Termination)
// do not process normal messages while waiting for all children to terminate
dispatcher suspend this
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopping"))
@ -631,7 +819,8 @@ private[akka] class ActorCell(
}
def supervise(child: ActorRef): Unit = if (!isTerminating) {
if (childrenRefs.getByRef(child).isEmpty) childrenRefs = childrenRefs.add(child)
if (childrenRefs.getByRef(child).isEmpty) addChild(child)
handleSupervise(child)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
}
@ -646,6 +835,7 @@ private[akka] class ActorCell(
case Terminate() terminate()
case Supervise(child) supervise(child)
case ChildTerminated(child) handleChildTerminated(child)
case NoMessage // only here to suppress warning
}
} catch {
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(e, "error while processing " + message)
@ -706,6 +896,7 @@ private[akka] class ActorCell(
msg.message match {
case Failed(cause) handleFailure(sender, cause)
case t: Terminated watching -= t.actor; receiveMessage(t)
case Kill throw new ActorKilledException("Kill")
case PoisonPill self.stop()
case SelectParent(m) parent.tell(m, msg.sender)
@ -794,8 +985,7 @@ private[akka] class ActorCell(
final def handleChildTerminated(child: ActorRef): Unit = try {
childrenRefs match {
case tc @ TerminatingChildrenContainer(_, _, reason)
val n = tc.remove(child)
childrenRefs = n
val n = removeChild(child)
actor.supervisorStrategy.handleChildTerminated(this, child, children)
if (!n.isInstanceOf[TerminatingChildrenContainer]) reason match {
case Recreation(cause) doRecreate(cause, actor) // doRecreate since this is the continuation of "recreate"
@ -803,7 +993,7 @@ private[akka] class ActorCell(
case _
}
case _
childrenRefs = childrenRefs.remove(child)
removeChild(child)
actor.supervisorStrategy.handleChildTerminated(this, child, children)
}
} catch {
@ -816,6 +1006,11 @@ private[akka] class ActorCell(
}
}
protected def handleSupervise(child: ActorRef): Unit = child match {
case r: RepointableActorRef r.activate()
case _
}
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause))

View file

@ -192,7 +192,7 @@ final class ChildActorPath(val parent: ActorPath, val name: String) extends Acto
// TODO RK investigate Phils hash from scala.collection.mutable.HashTable.improve
override def hashCode: Int = {
import scala.util.MurmurHash._
import akka.routing.MurmurHash._
@tailrec
def rec(p: ActorPath, h: Int, c: Int, k: Int): Int = p match {

View file

@ -163,10 +163,24 @@ private[akka] trait ActorRefScope {
def isLocal: Boolean
}
/**
* Refs which are statically known to be local inherit from this Scope
*/
private[akka] trait LocalRef extends ActorRefScope {
final def isLocal = true
}
/**
* RepointableActorRef (and potentially others) may change their locality at
* runtime, meaning that isLocal might not be stable. RepointableActorRef has
* the feature that it starts out not fully started (but you can send to it),
* which is why `isStarted` features here; it is not improbable that cluster
* actor refs will have the same behavior.
*/
private[akka] trait RepointableRef extends ActorRefScope {
def isStarted: Boolean
}
/**
* Internal trait for assembling all the functionality needed internally on
* ActorRefs. NOTE THAT THIS IS NOT A STABLE EXTERNAL INTERFACE!
@ -210,6 +224,16 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe
def isLocal: Boolean
}
/**
* Common trait of all actor refs which actually have a Cell, most notably
* LocalActorRef and RepointableActorRef. The former specializes the return
* type of `underlying` so that follow-up calls can use invokevirtual instead
* of invokeinterface.
*/
private[akka] abstract class ActorRefWithCell extends InternalActorRef { this: ActorRefScope
def underlying: Cell
}
/**
* This is an internal look-up failure token, not useful for anything else.
*/
@ -228,21 +252,21 @@ private[akka] class LocalActorRef private[akka] (
_props: Props,
_supervisor: InternalActorRef,
override val path: ActorPath)
extends InternalActorRef with LocalRef {
extends ActorRefWithCell with LocalRef {
/*
* actorCell.start() publishes actorCell & this to the dispatcher, which
* means that messages may be processed theoretically before the constructor
* ends. The JMM guarantees visibility for final fields only after the end
* of the constructor, so publish the actorCell safely by making it a
* @volatile var which is NOT TO BE WRITTEN TO. The alternative would be to
* move start() outside of the constructor, which would basically require
* us to use purely factory methods for creating LocalActorRefs.
* Safe publication of this classs fields is guaranteed by mailbox.setActor()
* which is called indirectly from actorCell.start() (if youre wondering why
* this is at all important, remember that under the JMM final fields are only
* frozen at the _end_ of the constructor, but we are publishing this before
* that is reached).
*/
@volatile
private var actorCell = newActorCell(_system, this, _props, _supervisor)
private val actorCell: ActorCell = newActorCell(_system, this, _props, _supervisor)
actorCell.start()
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
_supervisor.sendSystemMessage(akka.dispatch.Supervise(this))
protected def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef): ActorCell =
new ActorCell(system, ref, props, supervisor)
@ -313,9 +337,9 @@ private[akka] class LocalActorRef private[akka] (
// ========= AKKA PROTECTED FUNCTIONS =========
protected[akka] def underlying: ActorCell = actorCell
def underlying: ActorCell = actorCell
override def sendSystemMessage(message: SystemMessage): Unit = underlying.dispatcher.systemDispatch(underlying, message)
override def sendSystemMessage(message: SystemMessage): Unit = actorCell.sendSystemMessage(message)
override def !(message: Any)(implicit sender: ActorRef = null): Unit = actorCell.tell(message, sender)

View file

@ -26,12 +26,12 @@ trait ActorRefProvider {
/**
* Reference to the supervisor used for all top-level user actors.
*/
def guardian: InternalActorRef
def guardian: LocalActorRef
/**
* Reference to the supervisor used for all top-level system actors.
*/
def systemGuardian: InternalActorRef
def systemGuardian: LocalActorRef
/**
* Dead letter destination for this provider.
@ -104,7 +104,8 @@ trait ActorRefProvider {
path: ActorPath,
systemService: Boolean,
deploy: Option[Deploy],
lookupDeploy: Boolean): InternalActorRef
lookupDeploy: Boolean,
async: Boolean): InternalActorRef
/**
* Create actor reference for a specified local or remote path. If no such
@ -481,11 +482,10 @@ class LocalActorRefProvider(
}
}
lazy val guardian: InternalActorRef =
actorOf(system, guardianProps, rootGuardian, rootPath / "user", true, None, false)
lazy val guardian: LocalActorRef = new LocalActorRef(system, guardianProps, rootGuardian, rootPath / "user")
lazy val systemGuardian: InternalActorRef =
actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system", true, None, false)
lazy val systemGuardian: LocalActorRef =
new LocalActorRef(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system")
lazy val tempContainer = new VirtualPathContainer(system.provider, tempNode, rootGuardian, log)
@ -539,22 +539,20 @@ class LocalActorRefProvider(
}
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath,
systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean): InternalActorRef = {
systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean, async: Boolean): InternalActorRef = {
props.routerConfig match {
case NoRouter new LocalActorRef(system, props, supervisor, path) // create a local actor
case NoRouter
if (async) new RepointableActorRef(system, props, supervisor, path).initialize()
else new LocalActorRef(system, props, supervisor, path)
case router
val lookup = if (lookupDeploy) deployer.lookup(path) else None
val fromProps = Iterator(props.deploy.copy(routerConfig = props.deploy.routerConfig withFallback router))
val d = fromProps ++ deploy.iterator ++ lookup.iterator reduce ((a, b) b withFallback a)
new RoutedActorRef(system, props.withRouter(d.routerConfig), supervisor, path)
val ref = new RoutedActorRef(system, props.withRouter(d.routerConfig), supervisor, path).initialize()
if (async) ref else ref.activate()
}
}
def getExternalAddressFor(addr: Address): Option[Address] = if (addr == rootPath.address) Some(addr) else None
}
private[akka] class GuardianCell(_system: ActorSystemImpl, _self: InternalActorRef, _props: Props, _parent: InternalActorRef)
extends ActorCell(_system, _self, _props, _parent) {
}

View file

@ -422,6 +422,13 @@ abstract class ExtendedActorSystem extends ActorSystem {
* creation.
*/
def dynamicAccess: DynamicAccess
/**
* For debugging: traverse actor hierarchy and make string representation.
* Careful, this may OOM on large actor systems, and it is only meant for
* helping debugging in case something already went terminally wrong.
*/
private[akka] def printTree: String
}
private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, classLoader: ClassLoader) extends ExtendedActorSystem {
@ -479,20 +486,11 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
protected def systemImpl: ActorSystemImpl = this
private[akka] def systemActorOf(props: Props, name: String): ActorRef = {
implicit val timeout = settings.CreationTimeout
Await.result((systemGuardian ? CreateChild(props, name)).mapTo[ActorRef], timeout.duration)
}
private[akka] def systemActorOf(props: Props, name: String): ActorRef = systemGuardian.underlying.attachChild(props, name)
def actorOf(props: Props, name: String): ActorRef = {
implicit val timeout = settings.CreationTimeout
Await.result((guardian ? CreateChild(props, name)).mapTo[ActorRef], timeout.duration)
}
def actorOf(props: Props, name: String): ActorRef = guardian.underlying.attachChild(props, name)
def actorOf(props: Props): ActorRef = {
implicit val timeout = settings.CreationTimeout
Await.result((guardian ? CreateRandomNameChild(props)).mapTo[ActorRef], timeout.duration)
}
def actorOf(props: Props): ActorRef = guardian.underlying.attachChild(props)
def stop(actor: ActorRef): Unit = {
implicit val timeout = settings.CreationTimeout
@ -539,10 +537,10 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
def dequeue() = null
def hasMessages = false
def numberOfMessages = 0
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = ()
def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = ()
}
//FIXME Why do we need this at all?
val deadLetterMailbox: Mailbox = new Mailbox(null, deadLetterQueue) {
val deadLetterMailbox: Mailbox = new Mailbox(deadLetterQueue) {
becomeClosed()
def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit =
deadLetters ! DeadLetter(handle, receiver, receiver)
@ -557,8 +555,8 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
def terminationFuture: Future[Unit] = provider.terminationFuture
def lookupRoot: InternalActorRef = provider.rootGuardian
def guardian: InternalActorRef = provider.guardian
def systemGuardian: InternalActorRef = provider.systemGuardian
def guardian: LocalActorRef = provider.guardian
def systemGuardian: LocalActorRef = provider.systemGuardian
def /(actorName: String): ActorPath = guardian.path / actorName
def /(path: Iterable[String]): ActorPath = guardian.path / path
@ -682,6 +680,31 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
override def toString: String = lookupRoot.path.root.address.toString
override def printTree: String = {
def printNode(node: ActorRef, indent: String): String = {
node match {
case wc: ActorRefWithCell
val cell = wc.underlying
indent + "-> " + node.path.name + " " + Logging.simpleName(node) + " " +
(cell match {
case real: ActorCell if (real.actor ne null) real.actor.getClass else "null"
case _ Logging.simpleName(cell)
}) +
" " + (cell.childrenRefs match {
case ActorCell.TerminatingChildrenContainer(_, toDie, reason)
"Terminating(" + reason + ")" +
(toDie.toSeq.sorted mkString ("\n" + indent + " toDie: ", "\n" + indent + " ", ""))
case x Logging.simpleName(x)
}) +
(if (cell.childrenRefs.children.isEmpty) "" else "\n") +
(cell.childrenRefs.children.toSeq.sorted map (printNode(_, indent + " |")) mkString ("\n"))
case _
indent + node.path.name + " " + Logging.simpleName(node)
}
}
printNode(actorFor("/"), "")
}
final class TerminationCallbacks extends Runnable with Awaitable[Unit] {
private val lock = new ReentrantGuard
private var callbacks: List[Runnable] = _ //non-volatile since guarded by the lock

View file

@ -9,11 +9,22 @@ import scala.collection.JavaConversions._
import java.lang.{ Iterable JIterable }
import akka.util.Duration
/**
* INTERNAL API
*/
private[akka] sealed trait ChildStats
/**
* INTERNAL API
*/
private[akka] case object ChildNameReserved extends ChildStats
/**
* ChildRestartStats is the statistics kept by every parent Actor for every child Actor
* and is used for SupervisorStrategies to know how to deal with problems that occur for the children.
*/
case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) {
case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L)
extends ChildStats {
//FIXME How about making ChildRestartStats immutable and then move these methods into the actual supervisor strategies?
def requestRestartPermission(retriesWindow: (Option[Int], Option[Int])): Boolean =

View file

@ -0,0 +1,214 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import akka.util.Unsafe
import scala.annotation.tailrec
import akka.dispatch.SystemMessage
import akka.dispatch.Mailbox
import akka.dispatch.Terminate
import akka.dispatch.Envelope
import akka.dispatch.Supervise
import akka.dispatch.Create
import akka.dispatch.MessageDispatcher
import java.util.concurrent.locks.ReentrantLock
import akka.event.Logging.Warning
import scala.collection.mutable.Queue
/**
* This actor ref starts out with some dummy cell (by default just enqueuing
* messages into vectors protected by ReentrantLock), it must be initialize()d
* before it can be sent to, and it will be activate()d by its supervisor in
* response to the Supervise() message, which will replace the contained Cell
* with a fully functional one, transfer all messages from dummy to real queue
* and swap out the cell ref.
*/
private[akka] class RepointableActorRef(
val system: ActorSystemImpl,
val props: Props,
val supervisor: InternalActorRef,
val path: ActorPath)
extends ActorRefWithCell with RepointableRef {
import AbstractActorRef.cellOffset
@volatile private var _cellDoNotCallMeDirectly: Cell = _
def underlying: Cell = Unsafe.instance.getObjectVolatile(this, cellOffset).asInstanceOf[Cell]
@tailrec final def swapCell(next: Cell): Cell = {
val old = underlying
if (Unsafe.instance.compareAndSwapObject(this, cellOffset, old, next)) old else swapCell(next)
}
/**
* Initialize: make a dummy cell which holds just a mailbox, then tell our
* supervisor that we exist so that he can create the real Cell in
* handleSupervise().
*
* Call twice on your own peril!
*
* This is protected so that others can have different initialization.
*/
def initialize(): this.type = {
swapCell(new UnstartedCell(system, this, props, supervisor))
supervisor.sendSystemMessage(Supervise(this))
this
}
/**
* This method is supposed to be called by the supervisor in handleSupervise()
* to replace the UnstartedCell with the real one. It assumes no concurrent
* modification of the `underlying` field, though it is safe to send messages
* at any time.
*/
def activate(): this.type = {
underlying match {
case u: UnstartedCell u.replaceWith(newCell())
case _ // this happens routinely for things which were created async=false
}
this
}
/**
* This is called by activate() to obtain the cell which is to replace the
* unstarted cell. The cell must be fully functional.
*/
def newCell(): Cell = new ActorCell(system, this, props, supervisor).start()
def suspend(): Unit = underlying.suspend()
def resume(): Unit = underlying.resume()
def stop(): Unit = underlying.stop()
def restart(cause: Throwable): Unit = underlying.restart(cause)
def isStarted: Boolean = !underlying.isInstanceOf[UnstartedCell]
def isTerminated: Boolean = underlying.isTerminated
def provider: ActorRefProvider = system.provider
def isLocal: Boolean = underlying.isLocal
def getParent: InternalActorRef = underlying.parent
def getChild(name: Iterator[String]): InternalActorRef =
if (name.hasNext) {
name.next match {
case ".." getParent.getChild(name)
case "" getChild(name)
case other
underlying.childrenRefs.getByName(other) match {
case Some(crs) crs.child.asInstanceOf[InternalActorRef].getChild(name)
case None Nobody
}
}
} else this
def !(message: Any)(implicit sender: ActorRef = null) = underlying.tell(message, sender)
def sendSystemMessage(message: SystemMessage) = underlying.sendSystemMessage(message)
@throws(classOf[java.io.ObjectStreamException])
protected def writeReplace(): AnyRef = SerializedActorRef(path)
}
private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: RepointableActorRef, val props: Props, val supervisor: InternalActorRef)
extends Cell {
/*
* This lock protects all accesses to this cells queues. It also ensures
* safe switching to the started ActorCell.
*/
val lock = new ReentrantLock
// use Envelope to keep on-send checks in the same place
val queue: Queue[Envelope] = Queue()
val systemQueue: Queue[SystemMessage] = Queue()
def replaceWith(cell: Cell): Unit = {
lock.lock()
try {
/*
* The CallingThreadDispatcher nicely dives under the ReentrantLock and
* breaks things by enqueueing into stale queues from within the message
* processing which happens in-line for sendSystemMessage() and tell().
* Since this is the only possible way to f*ck things up within this
* lock, double-tap (well, N-tap, really); concurrent modification is
* still not possible because were the only thread accessing the queues.
*/
var interrupted = false
while (systemQueue.nonEmpty || queue.nonEmpty) {
while (systemQueue.nonEmpty) {
val msg = systemQueue.dequeue()
try cell.sendSystemMessage(msg)
catch {
case _: InterruptedException interrupted = true
}
}
if (queue.nonEmpty) {
val envelope = queue.dequeue()
try cell.tell(envelope.message, envelope.sender)
catch {
case _: InterruptedException interrupted = true
}
}
}
if (interrupted) throw new InterruptedException
} finally try
self.swapCell(cell)
finally
lock.unlock()
}
def system: ActorSystem = systemImpl
def suspend(): Unit = {}
def resume(): Unit = {}
def restart(cause: Throwable): Unit = {}
def stop(): Unit = sendSystemMessage(Terminate())
def isTerminated: Boolean = false
def parent: InternalActorRef = supervisor
def childrenRefs: ActorCell.ChildrenContainer = ActorCell.EmptyChildrenContainer
def tell(message: Any, sender: ActorRef): Unit = {
lock.lock()
try {
if (self.underlying eq this) queue enqueue Envelope(message, sender, system)
else self.underlying.tell(message, sender)
} finally {
lock.unlock()
}
}
def sendSystemMessage(msg: SystemMessage): Unit = {
lock.lock()
try {
if (self.underlying eq this) systemQueue enqueue msg
else self.underlying.sendSystemMessage(msg)
} finally {
lock.unlock()
}
}
def isLocal = true
def hasMessages: Boolean = {
lock.lock()
try {
if (self.underlying eq this) !queue.isEmpty
else self.underlying.hasMessages
} finally {
lock.unlock()
}
}
def numberOfMessages: Int = {
lock.lock()
try {
if (self.underlying eq this) queue.size
else self.underlying.numberOfMessages
} finally {
lock.unlock()
}
}
}

View file

@ -592,7 +592,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] (
/**
* Returns the akka.actor.Props representation of this TypedProps
*/
def actorProps(): Props = if (dispatcher == Props().dispatcher) Props() else Props(dispatcher = dispatcher)
def actorProps(): Props = if (dispatcher == Props.default.dispatcher) Props.default else Props(dispatcher = dispatcher)
}
/**

View file

@ -16,8 +16,10 @@ import akka.event.Logging.LogEventException
import akka.jsr166y.{ ForkJoinTask, ForkJoinPool }
import akka.util.{ Unsafe, Duration, NonFatal, Index }
final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) {
if (message.isInstanceOf[AnyRef]) {
final case class Envelope private (val message: Any, val sender: ActorRef)
object Envelope {
def apply(message: Any, sender: ActorRef, system: ActorSystem): Envelope = {
val msg = message.asInstanceOf[AnyRef]
if (msg eq null) throw new InvalidMessageException("Message is null")
if (system.settings.SerializeAllMessages && !msg.isInstanceOf[NoSerializationVerificationNeeded]) {
@ -30,6 +32,7 @@ final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorS
}
}
}
new Envelope(message, sender)
}
}
@ -228,8 +231,8 @@ private[akka] object MessageDispatcher {
} {
val status = if (a.isTerminated) " (terminated)" else " (alive)"
val messages = a match {
case l: LocalActorRef " " + l.underlying.mailbox.numberOfMessages + " messages"
case _ " " + a.getClass
case r: ActorRefWithCell " " + r.underlying.numberOfMessages + " messages"
case _ " " + a.getClass
}
val parent = a match {
case i: InternalActorRef ", parent: " + i.getParent
@ -265,7 +268,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
/**
* Creates and returns a mailbox for the given actor.
*/
protected[akka] def createMailbox(actor: ActorCell): Mailbox //FIXME should this really be private[akka]?
protected[akka] def createMailbox(actor: Cell): Mailbox //FIXME should this really be private[akka]?
/**
* Identifier of this dispatcher, corresponds to the full key

View file

@ -9,6 +9,7 @@ import annotation.tailrec
import akka.util.{ Duration, Helpers }
import java.util.{ Comparator, Iterator }
import java.util.concurrent.{ Executor, LinkedBlockingQueue, ConcurrentLinkedQueue, ConcurrentSkipListSet }
import akka.actor.ActorSystemImpl
/**
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
@ -46,24 +47,25 @@ class BalancingDispatcher(
/**
* INTERNAL USE ONLY
*/
private[akka] val messageQueue: MessageQueue = mailboxType.create(None)
private[akka] val messageQueue: MessageQueue = mailboxType.create(None, None)
private class SharingMailbox(_actor: ActorCell, _messageQueue: MessageQueue) extends Mailbox(_actor, _messageQueue) with DefaultSystemMessageQueue {
private class SharingMailbox(val system: ActorSystemImpl, _messageQueue: MessageQueue)
extends Mailbox(_messageQueue) with DefaultSystemMessageQueue {
override def cleanUp(): Unit = {
val dlq = actor.systemImpl.deadLetterMailbox
val dlq = system.deadLetterMailbox
//Don't call the original implementation of this since it scraps all messages, and we don't want to do that
var message = systemDrain(NoMessage)
while (message ne null) {
// message must be virgin before being able to systemEnqueue again
val next = message.next
message.next = null
dlq.systemEnqueue(actor.self, message)
dlq.systemEnqueue(system.deadLetters, message)
message = next
}
}
}
protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor, messageQueue)
protected[akka] override def createMailbox(actor: akka.actor.Cell): Mailbox = new SharingMailbox(actor.systemImpl, messageQueue)
protected[akka] override def register(actor: ActorCell): Unit = {
super.register(actor)

View file

@ -82,7 +82,8 @@ class Dispatcher(
/**
* INTERNAL USE ONLY
*/
protected[akka] def createMailbox(actor: ActorCell): Mailbox = new Mailbox(actor, mailboxType.create(Some(actor))) with DefaultSystemMessageQueue
protected[akka] def createMailbox(actor: akka.actor.Cell): Mailbox =
new Mailbox(mailboxType.create(Some(actor.self), Some(actor.system))) with DefaultSystemMessageQueue
/**
* INTERNAL USE ONLY

View file

@ -6,6 +6,7 @@ package akka.dispatch
import akka.AkkaException
import java.util.{ Comparator, PriorityQueue, Queue, Deque }
import akka.util._
import akka.actor.{ ActorCell, ActorRef, Cell }
import java.util.concurrent._
import annotation.tailrec
import akka.event.Logging.Error
@ -41,11 +42,32 @@ private[akka] object Mailbox {
*
* INTERNAL API
*/
private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: MessageQueue)
private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
extends SystemMessageQueue with Runnable {
import Mailbox._
/*
* This is needed for actually executing the mailbox, i.e. invoking the
* ActorCell. There are situations (e.g. RepointableActorRef) where a Mailbox
* is constructed but we know that we will not execute it, in which case this
* will be null. It must be a var to support switching into an active
* mailbox, should the owning ActorRef turn local.
*
* ANOTHER THING, IMPORTANT:
*
* actorCell.start() publishes actorCell & self to the dispatcher, which
* means that messages may be processed theoretically before selfs constructor
* ends. The JMM guarantees visibility for final fields only after the end
* of the constructor, so safe publication requires that THIS WRITE BELOW
* stay as it is.
*/
@volatile
var actor: ActorCell = _
def setActor(cell: ActorCell): Unit = actor = cell
def dispatcher: MessageDispatcher = actor.dispatcher
/**
* Try to enqueue the message to this queue, or throw an exception.
*/
@ -230,11 +252,12 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
* if we closed the mailbox, we must dump the remaining system messages
* to deadLetters (this is essential for DeathWatch)
*/
val dlm = actor.systemImpl.deadLetterMailbox
while (nextMessage ne null) {
val msg = nextMessage
nextMessage = nextMessage.next
msg.next = null
try actor.systemImpl.deadLetterMailbox.systemEnqueue(actor.self, msg)
try dlm.systemEnqueue(actor.self, msg)
catch {
case NonFatal(e) actor.system.eventStream.publish(
Error(e, actor.self.path.toString, this.getClass, "error while enqueuing " + msg + " to deadLetters: " + e.getMessage))
@ -244,9 +267,6 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
if (failure ne null) actor.handleInvokeFailure(failure, failure.getMessage)
}
@inline
final def dispatcher: MessageDispatcher = actor.dispatcher
/**
* Overridable callback to clean up the mailbox,
* called when an actor is unregistered.
@ -265,7 +285,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
}
if (messageQueue ne null) // needed for CallingThreadDispatcher, which never calls Mailbox.run()
messageQueue.cleanUp(actor, actor.systemImpl.deadLetterQueue)
messageQueue.cleanUp(actor.self, actor.systemImpl.deadLetterQueue)
}
}
@ -303,7 +323,7 @@ trait MessageQueue {
* which is passed in. The owner of this MessageQueue is passed in if
* available (e.g. for creating DeadLetters()), /deadletters otherwise.
*/
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit
def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit
}
/**
@ -331,10 +351,11 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒
@tailrec
final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = {
assert(message.next eq null)
if (Mailbox.debug) println(actor.self + " having enqueued " + message)
if (Mailbox.debug) println(receiver + " having enqueued " + message)
val head = systemQueueGet
if (head == NoMessage) actor.system.deadLetterMailbox.systemEnqueue(receiver, message)
else {
if (head == NoMessage) {
if (actor ne null) actor.systemImpl.deadLetterMailbox.systemEnqueue(receiver, message)
} else {
/*
* this write is safely published by the compareAndSet contained within
* systemQueuePut; Intra-Thread Semantics on page 12 of the JSR133 spec
@ -366,11 +387,11 @@ trait QueueBasedMessageQueue extends MessageQueue {
def queue: Queue[Envelope]
def numberOfMessages = queue.size
def hasMessages = !queue.isEmpty
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = {
def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
if (hasMessages) {
var envelope = dequeue
while (envelope ne null) {
deadLetters.enqueue(owner.self, envelope)
deadLetters.enqueue(owner, envelope)
envelope = dequeue
}
}
@ -445,10 +466,20 @@ trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
}
/**
* MailboxType is a factory to create MessageQueues for an optionally provided ActorContext
* MailboxType is a factory to create MessageQueues for an optionally
* provided ActorContext.
*
* <b>Possibly Important Notice</b>
*
* When implementing a custom mailbox type, be aware that there is special
* semantics attached to `system.actorOf()` in that sending to the returned
* ActorRef mayfor a short period of timeenqueue the messages first in a
* dummy queue. Top-level actors are created in two steps, and only after the
* guardian actor has performed that second step will all previously sent
* messages be transferred from the dummy queue into the real mailbox.
*/
trait MailboxType {
def create(owner: Option[ActorContext]): MessageQueue
def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue
}
/**
@ -458,7 +489,7 @@ case class UnboundedMailbox() extends MailboxType {
def this(settings: ActorSystem.Settings, config: Config) = this()
final override def create(owner: Option[ActorContext]): MessageQueue =
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new ConcurrentLinkedQueue[Envelope]() with QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final def queue: Queue[Envelope] = this
}
@ -475,7 +506,7 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
final override def create(owner: Option[ActorContext]): MessageQueue =
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new LinkedBlockingQueue[Envelope](capacity) with QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final def queue: BlockingQueue[Envelope] = this
final val pushTimeOut = BoundedMailbox.this.pushTimeOut
@ -488,7 +519,7 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat
*/
class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope], final val initialCapacity: Int) extends MailboxType {
def this(cmp: Comparator[Envelope]) = this(cmp, 11)
final override def create(owner: Option[ActorContext]): MessageQueue =
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new PriorityBlockingQueue[Envelope](initialCapacity, cmp) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final def queue: Queue[Envelope] = this
}
@ -503,7 +534,7 @@ class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val cap
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
final override def create(owner: Option[ActorContext]): MessageQueue =
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) with QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final def queue: BlockingQueue[Envelope] = this
final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut
@ -517,7 +548,7 @@ case class UnboundedDequeBasedMailbox() extends MailboxType {
def this(settings: ActorSystem.Settings, config: Config) = this()
final override def create(owner: Option[ActorContext]): MessageQueue =
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new LinkedBlockingDeque[Envelope]() with DequeBasedMessageQueue with UnboundedDequeBasedMessageQueueSemantics {
final val queue = this
}
@ -534,7 +565,7 @@ case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTime
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedDequeBasedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedDequeBasedMailbox can not be null")
final override def create(owner: Option[ActorContext]): MessageQueue =
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new LinkedBlockingDeque[Envelope](capacity) with DequeBasedMessageQueue with BoundedDequeBasedMessageQueueSemantics {
final val queue = this
final val pushTimeOut = BoundedDequeBasedMailbox.this.pushTimeOut

View file

@ -23,42 +23,28 @@ import scala.runtime.ScalaRunTime
* send a message to on (or more) of these actors.
*/
private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _supervisor: InternalActorRef, _path: ActorPath)
extends LocalActorRef(
_system,
_props.copy(creator = () _props.routerConfig.createActor(), dispatcher = _props.routerConfig.routerDispatcher),
_supervisor,
_path) {
extends RepointableActorRef(_system, _props, _supervisor, _path) {
/*
* CAUTION: RoutedActorRef is PROBLEMATIC
* ======================================
*
* We are constructing/assembling the children outside of the scope of the
* Router actor, inserting them in its childrenRef list, which is not at all
* synchronized. This is done exactly once at start-up, all other accesses
* are done from the Router actor. This means that the only thing which is
* really hairy is making sure that the Router does not touch its childrenRefs
* before we are done with them: lock the monitor of the actor cell (hence the
* override of newActorCell) and use that to block the Router constructor for
* as long as it takes to setup the RoutedActorRef itself.
*
* ===> I M P O R T A N T N O T I C E <===
*
* DO NOT THROW ANY EXCEPTIONS BEFORE THE FOLLOWING TRY-BLOCK WITHOUT
* EXITING THE MONITOR OF THE actorCell!
*
* This is important, just dont do it! No kidding.
*/
override def newActorCell(
system: ActorSystemImpl,
ref: InternalActorRef,
props: Props,
supervisor: InternalActorRef): ActorCell = {
val cell = super.newActorCell(system, ref, props, supervisor)
Unsafe.instance.monitorEnter(cell)
cell
// verify that a BalancingDispatcher is not used with a Router
if (_props.routerConfig != NoRouter && _system.dispatchers.isBalancingDispatcher(_props.routerConfig.routerDispatcher)) {
throw new ConfigurationException(
"Configuration for " + this +
" is invalid - you can not use a 'BalancingDispatcher' as a Router's dispatcher, you can however use it for the routees.")
}
_props.routerConfig.verifyConfig()
override def newCell(): Cell = new RoutedActorCell(system, this, props, supervisor)
}
private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActorRef, _props: Props, _supervisor: InternalActorRef)
extends ActorCell(
_system,
_ref,
_props.copy(creator = () _props.routerConfig.createActor(), dispatcher = _props.routerConfig.routerDispatcher),
_supervisor) {
private[akka] val routerConfig = _props.routerConfig
private[akka] val routeeProps = _props.copy(routerConfig = NoRouter)
private[akka] val resizeInProgress = new AtomicBoolean
@ -72,39 +58,28 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
private var _routeeProvider: RouteeProvider = _
def routeeProvider = _routeeProvider
val route =
try {
// verify that a BalancingDispatcher is not used with a Router
if (_props.routerConfig != NoRouter && _system.dispatchers.isBalancingDispatcher(_props.routerConfig.routerDispatcher)) {
actorContext.stop(actorContext.self)
throw new ConfigurationException(
"Configuration for actor [" + _path.toString +
"] is invalid - you can not use a 'BalancingDispatcher' as a Router's dispatcher, you can however use it for the routees.")
}
_routeeProvider = routerConfig.createRouteeProvider(actorContext)
val r = routerConfig.createRoute(routeeProps, routeeProvider)
// initial resize, before message send
routerConfig.resizer foreach { r
if (r.isTimeForResize(resizeCounter.getAndIncrement()))
r.resize(routeeProps, routeeProvider)
}
r
} finally {
assert(Thread.holdsLock(actorContext))
Unsafe.instance.monitorExit(actorContext) // unblock Routers constructor
val route = {
_routeeProvider = routerConfig.createRouteeProvider(this)
val r = routerConfig.createRoute(routeeProps, routeeProvider)
// initial resize, before message send
routerConfig.resizer foreach { r
if (r.isTimeForResize(resizeCounter.getAndIncrement()))
r.resize(routeeProps, routeeProvider)
}
r
}
if (routerConfig.resizer.isEmpty && _routees.isEmpty)
throw new ActorInitializationException("router " + routerConfig + " did not register routees!")
start()
/*
* end of construction
*/
def applyRoute(sender: ActorRef, message: Any): Iterable[Destination] = message match {
case _: AutoReceivedMessage Destination(this, this) :: Nil
case Terminated(_) Destination(this, this) :: Nil
case _: AutoReceivedMessage Destination(self, self) :: Nil
case CurrentRoutees
sender ! RouterRoutees(_routees)
Nil
@ -122,7 +97,7 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
private[akka] def addRoutees(newRoutees: IndexedSeq[ActorRef]): Unit = {
_routees = _routees ++ newRoutees
// subscribe to Terminated messages for all route destinations, to be handled by Router actor
newRoutees foreach underlying.watch
newRoutees foreach watch
}
/**
@ -133,13 +108,13 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
*/
private[akka] def removeRoutees(abandonedRoutees: IndexedSeq[ActorRef]): Unit = {
_routees = _routees diff abandonedRoutees
abandonedRoutees foreach underlying.unwatch
abandonedRoutees foreach unwatch
}
override def !(message: Any)(implicit sender: ActorRef = null): Unit = {
override def tell(message: Any, sender: ActorRef): Unit = {
resize()
val s = if (sender eq null) underlying.system.deadLetters else sender
val s = if (sender eq null) system.deadLetters else sender
val msg = message match {
case Broadcast(m) m
@ -147,15 +122,18 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
}
applyRoute(s, message) match {
case Destination(_, x) :: Nil if x eq this super.!(message)(s)
case refs refs foreach (p p.recipient.!(msg)(p.sender))
case Destination(_, x) :: Nil if x == self super.tell(message, s)
case refs
refs foreach (p
if (p.recipient == self) super.tell(msg, p.sender)
else p.recipient.!(msg)(p.sender))
}
}
def resize(): Unit = {
for (r routerConfig.resizer) {
if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeInProgress.compareAndSet(false, true))
super.!(Router.Resize)
super.tell(Router.Resize, self)
}
}
}
@ -212,6 +190,11 @@ trait RouterConfig {
*/
def resizer: Option[Resizer] = None
/**
* Check that everything is there which is needed. Called in constructor of RoutedActorRef to fail early.
*/
def verifyConfig(): Unit = {}
}
/**
@ -227,7 +210,7 @@ class RouteeProvider(val context: ActorContext, val resizer: Option[Resizer]) {
* Not thread safe, but intended to be called from protected points, such as
* `RouterConfig.createRoute` and `Resizer.resize`.
*/
def registerRoutees(routees: IndexedSeq[ActorRef]): Unit = routedRef.addRoutees(routees)
def registerRoutees(routees: IndexedSeq[ActorRef]): Unit = routedCell.addRoutees(routees)
/**
* Adds the routees to the router.
@ -247,7 +230,7 @@ class RouteeProvider(val context: ActorContext, val resizer: Option[Resizer]) {
* Not thread safe, but intended to be called from protected points, such as
* `Resizer.resize`.
*/
def unregisterRoutees(routees: IndexedSeq[ActorRef]): Unit = routedRef.removeRoutees(routees)
def unregisterRoutees(routees: IndexedSeq[ActorRef]): Unit = routedCell.removeRoutees(routees)
def createRoutees(props: Props, nrOfInstances: Int, routees: Iterable[String]): IndexedSeq[ActorRef] =
(nrOfInstances, routees) match {
@ -264,9 +247,9 @@ class RouteeProvider(val context: ActorContext, val resizer: Option[Resizer]) {
/**
* All routees of the router
*/
def routees: IndexedSeq[ActorRef] = routedRef.routees
def routees: IndexedSeq[ActorRef] = routedCell.routees
private def routedRef = context.self.asInstanceOf[RoutedActorRef]
private def routedCell = context.asInstanceOf[RoutedActorCell]
}
/**
@ -298,12 +281,9 @@ trait CustomRoute {
*/
trait Router extends Actor {
// make sure that we synchronize properly to get the childrenRefs into our CPU cache
val ref = context.synchronized {
self match {
case x: RoutedActorRef x
case _ throw new ActorInitializationException("Router actor can only be used in RoutedActorRef")
}
val ref = context match {
case x: RoutedActorCell x
case _ throw new ActorInitializationException("Router actor can only be used in RoutedActorRef, not in " + context.getClass)
}
final def receive = ({
@ -417,8 +397,10 @@ class FromConfig(val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
def this() = this(Dispatchers.DefaultDispatcherId)
def createRoute(props: Props, routeeProvider: RouteeProvider): Route =
throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)")
override def verifyConfig(): Unit =
throw new ConfigurationException("router needs external configuration from file (e.g. application.conf)")
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null
def supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy
}
@ -774,9 +756,11 @@ trait SmallestMailboxLike { this: RouterConfig ⇒
* routers based on mailbox and actor internal state.
*/
protected def isProcessingMessage(a: ActorRef): Boolean = a match {
case x: LocalActorRef
val cell = x.underlying
cell.mailbox.isScheduled && cell.currentMessage != null
case x: ActorRefWithCell
x.underlying match {
case cell: ActorCell cell.mailbox.isScheduled && cell.currentMessage != null
case _ false
}
case _ false
}
@ -788,8 +772,8 @@ trait SmallestMailboxLike { this: RouterConfig ⇒
* routers based on mailbox and actor internal state.
*/
protected def hasMessages(a: ActorRef): Boolean = a match {
case x: LocalActorRef x.underlying.mailbox.hasMessages
case _ false
case x: ActorRefWithCell x.underlying.hasMessages
case _ false
}
/**
@ -799,8 +783,12 @@ trait SmallestMailboxLike { this: RouterConfig ⇒
* routers based on mailbox and actor internal state.
*/
protected def isSuspended(a: ActorRef): Boolean = a match {
case x: LocalActorRef x.underlying.mailbox.isSuspended
case _ false
case x: ActorRefWithCell
x.underlying match {
case cell: ActorCell cell.mailbox.isSuspended
case _ true
}
case _ false
}
/**
@ -810,8 +798,8 @@ trait SmallestMailboxLike { this: RouterConfig ⇒
* routers based on mailbox and actor internal state.
*/
protected def numberOfMessages(a: ActorRef): Int = a match {
case x: LocalActorRef x.underlying.mailbox.numberOfMessages
case _ 0
case x: ActorRefWithCell x.underlying.numberOfMessages
case _ 0
}
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = {
@ -1283,12 +1271,20 @@ case class DefaultResizer(
*/
def pressure(routees: IndexedSeq[ActorRef]): Int = {
routees count {
case a: LocalActorRef
val cell = a.underlying
pressureThreshold match {
case 1 cell.mailbox.isScheduled && cell.mailbox.hasMessages
case i if i < 1 cell.mailbox.isScheduled && cell.currentMessage != null
case threshold cell.mailbox.numberOfMessages >= threshold
case a: ActorRefWithCell
a.underlying match {
case cell: ActorCell
pressureThreshold match {
case 1 cell.mailbox.isScheduled && cell.mailbox.hasMessages
case i if i < 1 cell.mailbox.isScheduled && cell.currentMessage != null
case threshold cell.mailbox.numberOfMessages >= threshold
}
case cell
pressureThreshold match {
case 1 cell.hasMessages
case i if i < 1 true // unstarted cells are always busy, for example
case threshold cell.numberOfMessages >= threshold
}
}
case x
false

View file

@ -97,7 +97,7 @@ object Agent {
*/
class Agent[T](initialValue: T, system: ActorSystem) {
private val ref = Ref(initialValue)
private val updater = system.actorOf(Props(new AgentUpdater(this, ref))).asInstanceOf[LocalActorRef] //TODO can we avoid this somehow?
private val updater = system.actorOf(Props(new AgentUpdater(this, ref))).asInstanceOf[InternalActorRef] //TODO can we avoid this somehow?
/**
* Read the internal state of the agent.

View file

@ -36,6 +36,10 @@ akka {
# how often should the node move nodes, marked as unreachable by the failure detector, out of the membership ring?
unreachable-nodes-reaper-interval = 1s
# A joining node stops sending heartbeats to the node to join if it hasn't become member
# of the cluster within this deadline.
join-timeout = 60s
failure-detector {
# defines the failure detector threshold

View file

@ -85,11 +85,7 @@ class AccrualFailureDetector(
settings.FailureDetectorMaxSampleSize,
settings.FailureDetectorAcceptableHeartbeatPause,
settings.FailureDetectorMinStdDeviation,
// we use a conservative estimate for the first heartbeat because
// gossip needs to spread back to the joining node before the
// first real heartbeat is sent. Initial heartbeat is added when joining.
// FIXME this can be changed to HeartbeatInterval when ticket #2249 is fixed
settings.GossipInterval * 3 + settings.HeartbeatInterval,
settings.HeartbeatInterval,
clock)
private val log = Logging(system, "FailureDetector")

View file

@ -194,8 +194,8 @@ object MemberStatus {
* Represents the overview of the cluster, holds the cluster convergence table and set with unreachable nodes.
*/
case class GossipOverview(
seen: Map[Address, VectorClock] = Map.empty[Address, VectorClock],
unreachable: Set[Member] = Set.empty[Member]) {
seen: Map[Address, VectorClock] = Map.empty,
unreachable: Set[Member] = Set.empty) {
override def toString =
"GossipOverview(seen = [" + seen.mkString(", ") +
@ -241,7 +241,7 @@ object Gossip {
case class Gossip(
overview: GossipOverview = GossipOverview(),
members: SortedSet[Member], // sorted set of members with their status, sorted by address
meta: Map[String, Array[Byte]] = Map.empty[String, Array[Byte]],
meta: Map[String, Array[Byte]] = Map.empty,
version: VectorClock = VectorClock()) // vector clock version
extends ClusterMessage // is a serializable cluster message
with Versioned[Gossip] {
@ -461,7 +461,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
*/
private case class State(
latestGossip: Gossip,
memberMembershipChangeListeners: Set[MembershipChangeListener] = Set.empty[MembershipChangeListener])
joinInProgress: Map[Address, Deadline] = Map.empty,
memberMembershipChangeListeners: Set[MembershipChangeListener] = Set.empty)
if (!system.provider.isInstanceOf[RemoteActorRefProvider])
throw new ConfigurationException("ActorSystem[" + system + "] needs to have a 'RemoteActorRefProvider' enabled in the configuration")
@ -672,11 +673,18 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
* Try to join this cluster node with the node specified by 'address'.
* A 'Join(thisNodeAddress)' command is sent to the node to join.
*/
def join(address: Address): Unit = {
val connection = clusterCommandConnectionFor(address)
val command = ClusterUserAction.Join(selfAddress)
log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", selfAddress, address, connection)
connection ! command
@tailrec
final def join(address: Address): Unit = {
val localState = state.get
val newState = localState copy (joinInProgress = localState.joinInProgress +
(address -> (Deadline.now + JoinTimeout)))
if (!state.compareAndSet(localState, newState)) join(address) // recur
else {
val connection = clusterCommandConnectionFor(address)
val command = ClusterUserAction.Join(selfAddress)
log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", selfAddress, address, connection)
connection ! command
}
}
/**
@ -891,7 +899,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val localGossip = localState.latestGossip
val winningGossip =
if (remoteGossip.version <> localGossip.version) {
if (isSingletonCluster(localState) && localGossip.overview.unreachable.isEmpty && remoteGossip.members.contains(self)) {
// a fresh singleton cluster that is joining, no need to merge, use received gossip
remoteGossip
} else if (remoteGossip.version <> localGossip.version) {
// concurrent
val mergedGossip = remoteGossip merge localGossip
val versionedMergedGossip = mergedGossip :+ vclockNode
@ -911,7 +923,15 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
remoteGossip
}
val newState = localState copy (latestGossip = winningGossip seen selfAddress)
val newJoinInProgress =
if (localState.joinInProgress.isEmpty) localState.joinInProgress
else localState.joinInProgress --
winningGossip.members.map(_.address) --
winningGossip.overview.unreachable.map(_.address)
val newState = localState copy (
latestGossip = winningGossip seen selfAddress,
joinInProgress = newJoinInProgress)
// if we won the race then update else try again
if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update
@ -1021,16 +1041,15 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
* INTERNAL API.
*/
private[cluster] def heartbeat(): Unit = {
removeOverdueJoinInProgress()
val localState = state.get
if (!isSingletonCluster(localState)) {
val liveMembers = localState.latestGossip.members.toIndexedSeq
val beatTo = localState.latestGossip.members.toSeq.map(_.address) ++ localState.joinInProgress.keys
for (member liveMembers; if member.address != selfAddress) {
val connection = clusterGossipConnectionFor(member.address)
log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, connection)
connection ! selfHeartbeat
}
for (address beatTo; if address != selfAddress) {
val connection = clusterGossipConnectionFor(address)
log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, connection)
connection ! selfHeartbeat
}
}
@ -1078,6 +1097,23 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
}
/**
* INTERNAL API.
*
* Removes overdue joinInProgress from State.
*/
@tailrec
final private[cluster] def removeOverdueJoinInProgress(): Unit = {
val localState = state.get
val overdueJoins = localState.joinInProgress collect {
case (address, deadline) if deadline.isOverdue address
}
if (overdueJoins.nonEmpty) {
val newState = localState copy (joinInProgress = localState.joinInProgress -- overdueJoins)
if (!state.compareAndSet(localState, newState)) removeOverdueJoinInProgress() // recur
}
}
/**
* INTERNAL API.
*

View file

@ -26,14 +26,15 @@ class ClusterSettings(val config: Config, val systemName: String) {
case "" None
case AddressFromURIString(addr) Some(addr)
}
final val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS)
final val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS)
final val HeartbeatInterval = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS)
final val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS)
final val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS)
final val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons")
final val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes")
final val AutoDown = getBoolean("akka.cluster.auto-down")
final val SchedulerTickDuration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS)
final val SchedulerTicksPerWheel = getInt("akka.cluster.scheduler.ticks-per-wheel")
final val PeriodicTasksInitialDelay: Duration = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS)
final val GossipInterval: Duration = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS)
final val HeartbeatInterval: Duration = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS)
final val LeaderActionsInterval: Duration = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS)
final val UnreachableNodesReaperInterval: Duration = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS)
final val NrOfGossipDaemons: Int = getInt("akka.cluster.nr-of-gossip-daemons")
final val NrOfDeputyNodes: Int = getInt("akka.cluster.nr-of-deputy-nodes")
final val AutoDown: Boolean = getBoolean("akka.cluster.auto-down")
final val JoinTimeout: Duration = Duration(getMilliseconds("akka.cluster.join-timeout"), MILLISECONDS)
final val SchedulerTickDuration: Duration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS)
final val SchedulerTicksPerWheel: Int = getInt("akka.cluster.scheduler.ticks-per-wheel")
}

View file

@ -0,0 +1,65 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.util.duration._
import akka.util.Deadline
object JoinInProgressMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("""
akka.cluster {
# simulate delay in gossip by turning it off
gossip-interval = 300 s
failure-detector {
threshold = 4
acceptable-heartbeat-pause = 1 second
}
}""") // increase the leader action task interval
.withFallback(MultiNodeClusterSpec.clusterConfig)))
}
class JoinInProgressMultiJvmNode1 extends JoinInProgressSpec with AccrualFailureDetectorStrategy
class JoinInProgressMultiJvmNode2 extends JoinInProgressSpec with AccrualFailureDetectorStrategy
abstract class JoinInProgressSpec
extends MultiNodeSpec(JoinInProgressMultiJvmSpec)
with MultiNodeClusterSpec {
import JoinInProgressMultiJvmSpec._
"A cluster node" must {
"send heartbeats immediately when joining to avoid false failure detection due to delayed gossip" taggedAs LongRunningTest in {
runOn(first) {
startClusterNode()
}
enterBarrier("first-started")
runOn(second) {
cluster.join(first)
}
runOn(first) {
val until = Deadline.now + 5.seconds
while (!until.isOverdue) {
200.millis.sleep
cluster.failureDetector.isAvailable(second) must be(true)
}
}
enterBarrier("after")
}
}
}

View file

@ -19,11 +19,11 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig {
val fourth = role("fourth")
val fifth = role("fifth")
// Note that this test uses default configuration,
// not MultiNodeClusterSpec.clusterConfig
commonConfig(ConfigFactory.parseString("""
akka.cluster {
nr-of-deputy-nodes = 0
# FIXME remove this (use default) when ticket #2239 has been fixed
gossip-interval = 400 ms
}
akka.loglevel = INFO
"""))

View file

@ -68,7 +68,7 @@ abstract class TransitionSpec
}
def awaitMemberStatus(address: Address, status: MemberStatus): Unit = awaitCond {
memberStatus(address) == Up
memberStatus(address) == status
}
// DSL sugar for `role1 gossipTo role2`
@ -118,33 +118,18 @@ abstract class TransitionSpec
awaitMembers(first, second)
memberStatus(first) must be(Up)
memberStatus(second) must be(Joining)
seenLatestGossip must be(Set(first))
cluster.convergence.isDefined must be(false)
}
enterBarrier("second-joined")
first gossipTo second
runOn(second) {
members must be(Set(first, second))
memberStatus(first) must be(Up)
memberStatus(second) must be(Joining)
// we got a conflicting version in second, and therefore not convergence in second
seenLatestGossip must be(Set(second))
cluster.convergence.isDefined must be(false)
}
second gossipTo first
runOn(first) {
seenLatestGossip must be(Set(first, second))
}
first gossipTo second
runOn(second) {
seenLatestGossip must be(Set(first, second))
}
runOn(first, second) {
memberStatus(first) must be(Up)
memberStatus(second) must be(Joining)
seenLatestGossip must be(Set(first, second))
cluster.convergence.isDefined must be(true)
}
enterBarrier("convergence-joining-2")
@ -191,42 +176,20 @@ abstract class TransitionSpec
second gossipTo first
runOn(first) {
members must be(Set(first, second, third))
cluster.convergence.isDefined must be(false)
memberStatus(third) must be(Joining)
seenLatestGossip must be(Set(first, second))
cluster.convergence.isDefined must be(false)
}
first gossipTo third
runOn(third) {
members must be(Set(first, second, third))
cluster.convergence.isDefined must be(false)
memberStatus(third) must be(Joining)
// conflicting version
seenLatestGossip must be(Set(third))
}
third gossipTo first
third gossipTo second
runOn(first, second) {
seenLatestGossip must be(Set(myself, third))
}
first gossipTo second
runOn(second) {
seenLatestGossip must be(Set(first, second, third))
cluster.convergence.isDefined must be(true)
}
runOn(first, third) {
cluster.convergence.isDefined must be(false)
}
second gossipTo first
second gossipTo third
runOn(first, second, third) {
seenLatestGossip must be(Set(first, second, third))
members must be(Set(first, second, third))
memberStatus(first) must be(Up)
memberStatus(second) must be(Up)
memberStatus(third) must be(Joining)
seenLatestGossip must be(Set(first, second, third))
cluster.convergence.isDefined must be(true)
}
@ -283,19 +246,21 @@ abstract class TransitionSpec
"startup a second separated cluster consisting of nodes fourth and fifth" taggedAs LongRunningTest in {
runOn(fourth) {
cluster.join(fifth)
awaitMembers(fourth, fifth)
cluster.gossipTo(fifth)
awaitSeen(fourth, fifth)
cluster.convergence.isDefined must be(true)
}
runOn(fifth) {
awaitMembers(fourth, fifth)
cluster.gossipTo(fourth)
awaitSeen(fourth, fifth)
cluster.gossipTo(fourth)
}
testConductor.enter("fourth-joined")
fifth gossipTo fourth
fourth gossipTo fifth
runOn(fourth, fifth) {
memberStatus(fourth) must be(Joining)
memberStatus(fifth) must be(Up)
seenLatestGossip must be(Set(fourth, fifth))
cluster.convergence.isDefined must be(true)
}
enterBarrier("fourth-joined-fifth")
enterBarrier("after-4")
}
@ -387,8 +352,7 @@ abstract class TransitionSpec
enterBarrier("after-5")
}
// FIXME ignored due to #2259
"perform correct transitions when second becomes unavailble" taggedAs LongRunningTest ignore {
"perform correct transitions when second becomes unavailble" taggedAs LongRunningTest in {
runOn(fifth) {
markNodeAsUnavailable(second)
cluster.reapUnreachableMembers()

View file

@ -27,6 +27,7 @@ class ClusterConfigSpec extends AkkaSpec {
HeartbeatInterval must be(1 second)
LeaderActionsInterval must be(1 second)
UnreachableNodesReaperInterval must be(1 second)
JoinTimeout must be(60 seconds)
NrOfGossipDaemons must be(4)
NrOfDeputyNodes must be(3)
AutoDown must be(true)

View file

@ -24,6 +24,15 @@ import com.typesafe.config.Config;
//#imports-prio-mailbox
//#imports-custom
import akka.dispatch.Envelope;
import akka.dispatch.MessageQueue;
import akka.dispatch.MailboxType;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
//#imports-custom
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -136,4 +145,32 @@ public class DispatcherDocTestBase {
}
}
//#prio-mailbox
//#mailbox-implementation-example
class MyUnboundedMailbox implements MailboxType {
// This constructor signature must exist, it will be called by Akka
public MyUnboundedMailbox(ActorSystem.Settings settings, Config config) {
// put your initialization code here
}
// The create method is called to create the MessageQueue
public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) {
return new MessageQueue() {
private final Queue<Envelope> queue = new ConcurrentLinkedQueue<Envelope>();
// these must be implemented; queue used as example
public void enqueue(ActorRef receiver, Envelope handle) { queue.offer(handle); }
public Envelope dequeue() { return queue.poll(); }
public int numberOfMessages() { return queue.size(); }
public boolean hasMessages() { return !queue.isEmpty(); }
public void cleanUp(ActorRef owner, MessageQueue deadLetters) {
for (Envelope handle: queue) {
deadLetters.enqueue(owner, handle);
}
}
};
}
}
//#mailbox-implementation-example
}

View file

@ -183,3 +183,46 @@ And then an example on how you would use it:
the configuration which describes the dispatcher using this mailbox type; the
mailbox type will be instantiated once for each dispatcher using it.
Creating your own Mailbox type
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
An example is worth a thousand quacks:
.. includecode:: code/docs/dispatcher/DispatcherDocTestBase.java#imports-custom
.. includecode:: code/docs/dispatcher/DispatcherDocTestBase.java#mailbox-implementation-example
And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher configuration.
.. note::
Make sure to include a constructor which takes
``akka.actor.ActorSystem.Settings`` and ``com.typesafe.config.Config``
arguments, as this constructor is invoked reflectively to construct your
mailbox type. The config passed in as second argument is that section from
the configuration which describes the dispatcher using this mailbox type; the
mailbox type will be instantiated once for each dispatcher using it.
Special Semantics of ``system.actorOf``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
In order to make ``system.actorOf`` both synchronous and non-blocking while
keeping the return type :class:`ActorRef` (and the semantics that the returned
ref is fully functional), special handling takes place for this case. Behind
the scenes, a hollow kind of actor reference is constructed, which is sent to
the systems guardian actor who actually creates the actor and its context and
puts those inside the reference. Until that has happened, messages sent to the
:class:`ActorRef` will be queued locally, and only upon swapping the real
filling in will they be transferred into the real mailbox. Thus,
.. code-block:: scala
final Props props = ...
// this actor uses MyCustomMailbox, which is assumed to be a singleton
system.actorOf(props.withDispatcher("myCustomMailbox").tell("bang");
assert(MyCustomMailbox.getInstance().getLastEnqueued().equals("bang"));
will probably fail; you will have to allow for some time to pass and retry the
check à la :meth:`TestKit.awaitCond`.

View file

@ -82,13 +82,6 @@ that is used in log messages and for identifying actors. The name must not be em
or start with ``$``. If the given name is already in use by another child to the
same parent actor an `InvalidActorNameException` is thrown.
.. warning::
Creating top-level actors with ``system.actorOf`` is a blocking operation,
hence it may dead-lock due to starvation if the default dispatcher is
overloaded. To avoid problems, do not call this method from within actors or
futures which run on the default dispatcher.
Actors are automatically started asynchronously when created.
When you create the ``UntypedActor`` then it will automatically call the ``preStart``
callback method on the ``UntypedActor`` class. This is an excellent place to

View file

@ -11,7 +11,7 @@ import akka.actor.Props
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
import org.scalatest.matchers.MustMatchers
import akka.testkit.AkkaSpec
import akka.actor.Actor
import akka.actor.{ Actor, ExtendedActorSystem }
class MyActor extends Actor {
def receive = {
@ -56,20 +56,20 @@ import akka.util.duration._
class MyMailboxType(systemSettings: ActorSystem.Settings, config: Config)
extends MailboxType {
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
case Some(o) new MyMessageQueue(o)
override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = owner zip system headOption match {
case Some((o, s: ExtendedActorSystem)) new MyMessageQueue(o, s)
case None throw new IllegalArgumentException(
"requires an owner (i.e. does not work with BalancingDispatcher)")
}
}
class MyMessageQueue(_owner: ActorContext)
extends DurableMessageQueue(_owner) with DurableMessageSerialization {
class MyMessageQueue(_owner: ActorRef, _system: ExtendedActorSystem)
extends DurableMessageQueue(_owner, _system) with DurableMessageSerialization {
val storage = new QueueStorage
// A real-world implmentation would use configuration to set the last
// three parameters below
val breaker = CircuitBreaker(_owner.system.scheduler, 5, 30.seconds, 1.minute)
val breaker = CircuitBreaker(system.scheduler, 5, 30.seconds, 1.minute)
def enqueue(receiver: ActorRef, envelope: Envelope): Unit = breaker.withSyncCircuitBreaker {
val data: Array[Byte] = serialize(envelope)
@ -91,7 +91,7 @@ class MyMessageQueue(_owner: ActorContext)
* but the purpose of a durable mailbox is to continue
* with the same message queue when the actor is started again.
*/
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = ()
def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = ()
}
//#custom-mailbox

View file

@ -76,13 +76,6 @@ that is used in log messages and for identifying actors. The name must not be em
or start with ``$``. If the given name is already in use by another child to the
same parent actor an `InvalidActorNameException` is thrown.
.. warning::
Creating top-level actors with ``system.actorOf`` is a blocking operation,
hence it may dead-lock due to starvation if the default dispatcher is
overloaded. To avoid problems, do not call this method from within actors or
futures which run on the default dispatcher.
Actors are automatically started asynchronously when created.
When you create the ``Actor`` then it will automatically call the ``preStart``
callback method on the ``Actor`` trait. This is an excellent place to

View file

@ -134,8 +134,8 @@ object DispatcherDocSpec {
}
//#mailbox-implementation-example
case class MyUnboundedMailbox() extends akka.dispatch.MailboxType {
import akka.actor.ActorContext
class MyUnboundedMailbox extends akka.dispatch.MailboxType {
import akka.actor.{ ActorRef, ActorSystem }
import com.typesafe.config.Config
import java.util.concurrent.ConcurrentLinkedQueue
import akka.dispatch.{
@ -149,12 +149,12 @@ object DispatcherDocSpec {
def this(settings: ActorSystem.Settings, config: Config) = this()
// The create method is called to create the MessageQueue
final override def create(owner: Option[ActorContext]): MessageQueue =
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final val queue = new ConcurrentLinkedQueue[Envelope]()
}
//#mailbox-implementation-example
}
//#mailbox-implementation-example
}
class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {

View file

@ -198,3 +198,25 @@ And then you just specify the FQCN of your MailboxType as the value of the "mail
the configuration which describes the dispatcher using this mailbox type; the
mailbox type will be instantiated once for each dispatcher using it.
Special Semantics of ``system.actorOf``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
In order to make ``system.actorOf`` both synchronous and non-blocking while
keeping the return type :class:`ActorRef` (and the semantics that the returned
ref is fully functional), special handling takes place for this case. Behind
the scenes, a hollow kind of actor reference is constructed, which is sent to
the systems guardian actor who actually creates the actor and its context and
puts those inside the reference. Until that has happened, messages sent to the
:class:`ActorRef` will be queued locally, and only upon swapping the real
filling in will they be transferred into the real mailbox. Thus,
.. code-block:: scala
val props: Props = ...
// this actor uses MyCustomMailbox, which is assumed to be a singleton
system.actorOf(props.withDispatcher("myCustomMailbox")) ! "bang"
assert(MyCustomMailbox.instance.getLastEnqueuedMessage == "bang")
will probably fail; you will have to allow for some time to pass and retry the
check à la :meth:`TestKit.awaitCond`.

View file

@ -13,20 +13,22 @@ import akka.actor.ActorSystem
import akka.dispatch._
import akka.util.{ Duration, NonFatal }
import akka.pattern.{ CircuitBreakerOpenException, CircuitBreaker }
import akka.actor.ExtendedActorSystem
class FileBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType {
private val settings = new FileBasedMailboxSettings(systemSettings, config)
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
case Some(o) new FileBasedMessageQueue(o, settings)
case None throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = owner zip system headOption match {
case Some((o, s: ExtendedActorSystem)) new FileBasedMessageQueue(o, s, settings)
case None throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
}
}
class FileBasedMessageQueue(_owner: ActorContext, val settings: FileBasedMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
class FileBasedMessageQueue(_owner: ActorRef, _system: ExtendedActorSystem, val settings: FileBasedMailboxSettings)
extends DurableMessageQueue(_owner, _system) with DurableMessageSerialization {
// TODO Is it reasonable for all FileBasedMailboxes to have their own logger?
private val log = Logging(system, "FileBasedMessageQueue")
val breaker = CircuitBreaker(_owner.system.scheduler, settings.CircuitBreakerMaxFailures, settings.CircuitBreakerCallTimeout, settings.CircuitBreakerResetTimeout)
val breaker = CircuitBreaker(system.scheduler, settings.CircuitBreakerMaxFailures, settings.CircuitBreakerCallTimeout, settings.CircuitBreakerResetTimeout)
private val queue = try {
(new java.io.File(settings.QueuePath)) match {
@ -79,5 +81,5 @@ class FileBasedMessageQueue(_owner: ActorContext, val settings: FileBasedMailbox
case NonFatal(_) false
}
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = ()
def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = ()
}

View file

@ -13,11 +13,10 @@ private[akka] object DurableExecutableMailboxConfig {
val Name = "[\\.\\/\\$\\s]".r
}
abstract class DurableMessageQueue(val owner: ActorContext) extends MessageQueue {
abstract class DurableMessageQueue(val owner: ActorRef, val system: ExtendedActorSystem) extends MessageQueue {
import DurableExecutableMailboxConfig._
def system: ExtendedActorSystem = owner.system.asInstanceOf[ExtendedActorSystem]
def ownerPath: ActorPath = owner.self.path
def ownerPath: ActorPath = owner.path
val ownerPathString: String = ownerPath.elements.mkString("/")
val name: String = "mailbox_" + Name.replaceAllIn(ownerPathString, "_")
@ -42,7 +41,7 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒
val message = MessageSerializer.serialize(system, durableMessage.message.asInstanceOf[AnyRef])
val builder = RemoteMessageProtocol.newBuilder
.setMessage(message)
.setRecipient(serializeActorRef(owner.self))
.setRecipient(serializeActorRef(owner))
.setSender(serializeActorRef(durableMessage.sender))
builder.build.toByteArray
@ -60,7 +59,7 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒
val message = MessageSerializer.deserialize(system, durableMessage.getMessage)
val sender = deserializeActorRef(durableMessage.getSender)
Envelope(message, sender)(system)
Envelope(message, sender, system)
}
}

View file

@ -3,25 +3,21 @@
*/
package akka.actor.mailbox
import DurableMailboxSpecActorFactory.AccumulatorActor
import DurableMailboxSpecActorFactory.MailboxTestActor
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.LocalActorRef
import akka.actor.Props
import akka.actor.actorRef2Scala
import java.io.InputStream
import java.util.concurrent.TimeoutException
import scala.annotation.tailrec
import org.scalatest.{ WordSpec, BeforeAndAfterAll }
import org.scalatest.matchers.MustMatchers
import com.typesafe.config.{ ConfigFactory, Config }
import DurableMailboxSpecActorFactory.{ MailboxTestActor, AccumulatorActor }
import akka.actor.{ RepointableRef, Props, ActorSystem, ActorRefWithCell, ActorRef, ActorCell, Actor }
import akka.dispatch.Mailbox
import akka.testkit.TestKit
import akka.util.duration.intToDurationInt
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import java.io.InputStream
import java.util.concurrent.TimeoutException
import org.scalatest.BeforeAndAfterAll
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import scala.annotation.tailrec
object DurableMailboxSpecActorFactory {
@ -115,9 +111,15 @@ abstract class DurableMailboxSpec(system: ActorSystem, val backendName: String)
if (!result.contains(words)) throw new Exception("stream did not contain '" + words + "':\n" + result)
}
def createMailboxTestActor(props: Props = Props[MailboxTestActor], id: String = ""): ActorRef = id match {
case null | "" system.actorOf(props.withDispatcher(backendName + "-dispatcher"))
case some system.actorOf(props.withDispatcher(backendName + "-dispatcher"), some)
def createMailboxTestActor(props: Props = Props[MailboxTestActor], id: String = ""): ActorRef = {
val ref = id match {
case null | "" system.actorOf(props.withDispatcher(backendName + "-dispatcher"))
case some system.actorOf(props.withDispatcher(backendName + "-dispatcher"), some)
}
awaitCond(ref match {
case r: RepointableRef r.isStarted
}, 1 second, 10 millis)
ref
}
private def isDurableMailbox(m: Mailbox): Boolean =
@ -127,9 +129,11 @@ abstract class DurableMailboxSpec(system: ActorSystem, val backendName: String)
"get a new, unique, durable mailbox" in {
val a1, a2 = createMailboxTestActor()
isDurableMailbox(a1.asInstanceOf[LocalActorRef].underlying.mailbox) must be(true)
isDurableMailbox(a2.asInstanceOf[LocalActorRef].underlying.mailbox) must be(true)
(a1.asInstanceOf[LocalActorRef].underlying.mailbox ne a2.asInstanceOf[LocalActorRef].underlying.mailbox) must be(true)
val mb1 = a1.asInstanceOf[ActorRefWithCell].underlying.asInstanceOf[ActorCell].mailbox
val mb2 = a2.asInstanceOf[ActorRefWithCell].underlying.asInstanceOf[ActorCell].mailbox
isDurableMailbox(mb1) must be(true)
isDurableMailbox(mb2) must be(true)
(mb1 ne mb2) must be(true)
}
"deliver messages at most once" in {
@ -148,7 +152,7 @@ abstract class DurableMailboxSpec(system: ActorSystem, val backendName: String)
"support having multiple actors at the same time" in {
val actors = Vector.fill(3)(createMailboxTestActor(Props[AccumulatorActor]))
actors foreach { a isDurableMailbox(a.asInstanceOf[LocalActorRef].underlying.mailbox) must be(true) }
actors foreach { a isDurableMailbox(a.asInstanceOf[ActorRefWithCell].underlying.asInstanceOf[ActorCell].mailbox) must be(true) }
val msgs = 1 to 3

View file

@ -36,8 +36,8 @@ private[akka] class RemoteActorRefProvider(
// these are only available after init()
override def rootGuardian: InternalActorRef = local.rootGuardian
override def guardian: InternalActorRef = local.guardian
override def systemGuardian: InternalActorRef = local.systemGuardian
override def guardian: LocalActorRef = local.guardian
override def systemGuardian: LocalActorRef = local.systemGuardian
override def terminationFuture: Promise[Unit] = local.terminationFuture
override def dispatcher: MessageDispatcher = local.dispatcher
override def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit = local.registerTempActor(actorRef, path)
@ -96,8 +96,8 @@ private[akka] class RemoteActorRefProvider(
}
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath,
systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean): InternalActorRef = {
if (systemService) local.actorOf(system, props, supervisor, path, systemService, deploy, lookupDeploy)
systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean, async: Boolean): InternalActorRef = {
if (systemService) local.actorOf(system, props, supervisor, path, systemService, deploy, lookupDeploy, async)
else {
/*
@ -155,14 +155,14 @@ private[akka] class RemoteActorRefProvider(
Iterator(props.deploy) ++ deployment.iterator reduce ((a, b) b withFallback a) match {
case d @ Deploy(_, _, _, RemoteScope(addr))
if (addr == rootPath.address || addr == transport.address) {
local.actorOf(system, props, supervisor, path, false, deployment.headOption, false)
local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async)
} else {
val rpath = RootActorPath(addr) / "remote" / transport.address.hostPort / path.elements
useActorOnNode(rpath, props, d, supervisor)
new RemoteActorRef(this, transport, rpath, supervisor)
}
case _ local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false)
case _ local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async)
}
}
}

View file

@ -5,10 +5,11 @@
package akka.remote
import scala.annotation.tailrec
import akka.actor.{ VirtualPathContainer, Terminated, Deploy, Props, Nobody, LocalActorRef, InternalActorRef, Address, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor }
import akka.event.LoggingAdapter
import akka.dispatch.Watch
import akka.actor.ActorRefWithCell
import akka.actor.ActorRefScope
private[akka] sealed trait DaemonMsg
private[akka] case class DaemonMsgCreate(props: Props, deploy: Deploy, path: String, supervisor: ActorRef) extends DaemonMsg
@ -60,7 +61,7 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath
val subpath = elems.drop(1)
val path = this.path / subpath
val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef],
path, false, Some(deploy), true)
path, systemService = false, Some(deploy), lookupDeploy = true, async = false)
addChild(subpath.mkString("/"), actor)
this.sendSystemMessage(Watch(actor, this))
case _
@ -68,11 +69,12 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath
}
}
case Terminated(child: LocalActorRef) removeChild(child.path.elements.drop(1).mkString("/"))
case Terminated(child: ActorRefWithCell) if child.asInstanceOf[ActorRefScope].isLocal
removeChild(child.path.elements.drop(1).mkString("/"))
case t: Terminated
case t: Terminated
case unknown log.warning("Unknown message {} received by {}", unknown, this)
case unknown log.warning("Unknown message {} received by {}", unknown, this)
}
}

View file

@ -270,14 +270,14 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re
}
case x log.warning("remoteDaemon received illegal message {} from {}", x, remoteMessage.sender)
}
case l: LocalRef
case l @ (_: LocalRef | _: RepointableRef) if l.isLocal
if (provider.remoteSettings.LogReceive) log.debug("received local message {}", remoteMessage)
remoteMessage.payload match {
case msg: PossiblyHarmful if useUntrustedMode log.warning("operating in UntrustedMode, dropping inbound PossiblyHarmful message of type {}", msg.getClass)
case msg: SystemMessage l.sendSystemMessage(msg)
case msg l.!(msg)(remoteMessage.sender)
}
case r: RemoteRef
case r @ (_: RemoteRef | _: RepointableRef) if !r.isLocal
if (provider.remoteSettings.LogReceive) log.debug("received remote-destined message {}", remoteMessage)
remoteMessage.originalReceiver match {
case AddressFromURIString(address) if address == provider.transport.address
@ -285,7 +285,7 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re
r.!(remoteMessage.payload)(remoteMessage.sender)
case r log.error("dropping message {} for non-local recipient {} arriving at {} inbound address is {}", remoteMessage.payload, r, address, provider.transport.address)
}
case r log.error("dropping message {} for non-local recipient {} arriving at {} inbound address is {}", remoteMessage.payload, r, address, provider.transport.address)
case r log.error("dropping message {} for unknown recipient {} arriving at {} inbound address is {}", remoteMessage.payload, r, address, provider.transport.address)
}
}
}

View file

@ -71,7 +71,8 @@ class RemoteRouteeProvider(nodes: Iterable[Address], _context: ActorContext, _re
IndexedSeq.empty[ActorRef] ++ (for (i 1 to nrOfInstances) yield {
val name = "c" + i
val deploy = Deploy("", ConfigFactory.empty(), props.routerConfig, RemoteScope(nodeAddressIter.next))
impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name, false, Some(deploy), false)
impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name,
systemService = false, Some(deploy), lookupDeploy = false, async = false)
})
case (_, xs, _) throw new ConfigurationException("Remote target.nodes can not be combined with routees for [%s]"

View file

@ -118,7 +118,7 @@ akka {
val r = expectMsgType[ActorRef]
r ! (Props[Echo], "grandchild")
val remref = expectMsgType[ActorRef]
remref.isInstanceOf[LocalActorRef] must be(true)
remref.asInstanceOf[ActorRefScope].isLocal must be(true)
val myref = system.actorFor(system / "looker" / "child" / "grandchild")
myref.isInstanceOf[RemoteActorRef] must be(true)
myref ! 43

View file

@ -128,7 +128,7 @@ class CallingThreadDispatcher(
override def id: String = Id
protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(actor, mailboxType)
protected[akka] override def createMailbox(actor: akka.actor.Cell) = new CallingThreadMailbox(actor, mailboxType)
protected[akka] override def shutdown() {}
@ -281,17 +281,21 @@ class NestingQueue(val q: MessageQueue) {
def isActive = active
}
class CallingThreadMailbox(_receiver: ActorCell, val mailboxType: MailboxType) extends Mailbox(_receiver, null) with DefaultSystemMessageQueue {
class CallingThreadMailbox(_receiver: akka.actor.Cell, val mailboxType: MailboxType)
extends Mailbox(null) with DefaultSystemMessageQueue {
val system = _receiver.system
val self = _receiver.self
private val q = new ThreadLocal[NestingQueue]() {
override def initialValue = {
val queue = new NestingQueue(mailboxType.create(Some(actor)))
CallingThreadDispatcherQueues(actor.system).registerQueue(CallingThreadMailbox.this, queue)
val queue = new NestingQueue(mailboxType.create(Some(self), Some(system)))
CallingThreadDispatcherQueues(system).registerQueue(CallingThreadMailbox.this, queue)
queue
}
}
override def enqueue(receiver: ActorRef, msg: Envelope): Unit = throw new UnsupportedOperationException("CallingThreadMailbox cannot enqueue normally")
override def enqueue(receiver: ActorRef, msg: Envelope): Unit = q.get.q.enqueue(receiver, msg)
override def dequeue(): Envelope = throw new UnsupportedOperationException("CallingThreadMailbox cannot dequeue normally")
override def hasMessages: Boolean = q.get.q.hasMessages
override def numberOfMessages: Int = 0
@ -311,7 +315,7 @@ class CallingThreadMailbox(_receiver: ActorCell, val mailboxType: MailboxType) e
val q = queue
CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, q)
super.cleanUp()
q.q.cleanUp(actor, actor.systemImpl.deadLetterQueue)
q.q.cleanUp(actor.self, actor.systemImpl.deadLetterQueue)
}
}
}

View file

@ -56,7 +56,7 @@ class TestActorRef[T <: Actor](
* become/unbecome.
*/
def receive(o: Any, sender: ActorRef): Unit = try {
underlying.currentMessage = Envelope(o, if (sender eq null) underlying.system.deadLetters else sender)(underlying.system)
underlying.currentMessage = Envelope(o, if (sender eq null) underlying.system.deadLetters else sender, underlying.system)
underlying.receiveMessage(o)
} finally underlying.currentMessage = null

View file

@ -97,9 +97,14 @@ trait TestKitBase {
*/
lazy val testActor: ActorRef = {
val impl = system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559
impl.systemActorOf(Props(new TestActor(queue))
val ref = impl.systemActorOf(Props(new TestActor(queue))
.withDispatcher(CallingThreadDispatcher.Id),
"testActor" + TestKit.testActorId.incrementAndGet)
awaitCond(ref match {
case r: RepointableRef r.isStarted
case _ true
}, 1 second, 10 millis)
ref
}
private var end: Duration = Duration.Undefined

View file

@ -190,29 +190,24 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
}
@tailrec private def doPoll(mode: PollMsg, togo: Int = 10): Unit =
receiveMessage(mode) match {
case null // receiveMessage has already done something special here
case Seq() doPollTimeout(mode)
case frames
notifyListener(deserializer(frames))
if (togo > 0) doPoll(mode, togo - 1)
else self ! mode
if (togo <= 0) self ! mode
else receiveMessage(mode) match {
case Seq() doPollTimeout(mode)
case frames notifyListener(deserializer(frames)); doPoll(mode, togo - 1)
}
@tailrec private def receiveMessage(mode: PollMsg, currentFrames: Vector[Frame] = Vector.empty): Seq[Frame] = {
val result = mode match {
case Poll socket.recv(JZMQ.NOBLOCK)
case PollCareful if (poller.poll(0) > 0) socket.recv(0) else null
@tailrec private def receiveMessage(mode: PollMsg, currentFrames: Vector[Frame] = Vector.empty): Seq[Frame] =
if (mode == PollCareful && (poller.poll(0) <= 0)) {
if (currentFrames.isEmpty) currentFrames else throw new IllegalStateException("Received partial transmission!")
} else {
socket.recv(if (mode == Poll) JZMQ.NOBLOCK else 0) match {
case null /*EAGAIN*/
if (currentFrames.isEmpty) currentFrames else receiveMessage(mode, currentFrames)
case bytes
val frames = currentFrames :+ Frame(if (bytes.length == 0) noBytes else bytes)
if (socket.hasReceiveMore) receiveMessage(mode, frames) else frames
}
}
result match {
case null
if (currentFrames.isEmpty) currentFrames
else throw new IllegalStateException("no more frames available while socket.hasReceivedMore==true")
case bytes
val frames = currentFrames :+ Frame(if (bytes.length == 0) noBytes else bytes)
if (socket.hasReceiveMore) receiveMessage(mode, frames) else frames
}
}
private val listenerOpt = params collectFirst { case Listener(l) l }
private def watchListener(): Unit = listenerOpt foreach context.watch

View file

@ -485,7 +485,7 @@ object Dependency {
object V {
val Camel = "2.8.0"
val Logback = "1.0.4"
val Netty = "3.5.0.Final"
val Netty = "3.5.1.Final"
val Protobuf = "2.4.1"
val ScalaStm = "0.5"
val Scalatest = "1.6.1"

9
repl Normal file
View file

@ -0,0 +1,9 @@
import akka.actor._
import akka.dispatch.{ Future, Promise }
import com.typesafe.config.ConfigFactory
val config=ConfigFactory.parseString("akka.daemonic=on")
val sys=ActorSystem("repl", config.withFallback(ConfigFactory.load())).asInstanceOf[ExtendedActorSystem]
implicit val ec=sys.dispatcher
import akka.util.duration._
import akka.util.Timeout
implicit val timeout=Timeout(5 seconds)