Merge pull request #794 from akka/wip-2575-race-in-publication-during-actor-construction-ban
#2575 Start actor after it can be resolved with actorFor.
This commit is contained in:
commit
ba5152d42c
8 changed files with 42 additions and 17 deletions
|
|
@ -194,6 +194,7 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe
|
||||||
/*
|
/*
|
||||||
* Actor life-cycle management, invoked only internally (in response to user requests via ActorContext).
|
* Actor life-cycle management, invoked only internally (in response to user requests via ActorContext).
|
||||||
*/
|
*/
|
||||||
|
def start(): Unit
|
||||||
def resume(causedByFailure: Throwable): Unit
|
def resume(causedByFailure: Throwable): Unit
|
||||||
def suspend(): Unit
|
def suspend(): Unit
|
||||||
def restart(cause: Throwable): Unit
|
def restart(cause: Throwable): Unit
|
||||||
|
|
@ -259,13 +260,16 @@ private[akka] class LocalActorRef private[akka] (
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Safe publication of this class’s fields is guaranteed by mailbox.setActor()
|
* Safe publication of this class’s fields is guaranteed by mailbox.setActor()
|
||||||
* which is called indirectly from actorCell.start() (if you’re wondering why
|
* which is called indirectly from actorCell.init() (if you’re wondering why
|
||||||
* this is at all important, remember that under the JMM final fields are only
|
* this is at all important, remember that under the JMM final fields are only
|
||||||
* frozen at the _end_ of the constructor, but we are publishing “this” before
|
* frozen at the _end_ of the constructor, but we are publishing “this” before
|
||||||
* that is reached).
|
* that is reached).
|
||||||
|
* This means that the result of newActorCell needs to be written to the val
|
||||||
|
* actorCell before we call init and start, since we can start using "this"
|
||||||
|
* object from another thread as soon as we run init.
|
||||||
*/
|
*/
|
||||||
private val actorCell: ActorCell = newActorCell(_system, this, _props, _supervisor)
|
private val actorCell: ActorCell = newActorCell(_system, this, _props, _supervisor)
|
||||||
actorCell.start(sendSupervise = true, ThreadLocalRandom.current.nextInt())
|
actorCell.init(ThreadLocalRandom.current.nextInt(), sendSupervise = true)
|
||||||
|
|
||||||
protected def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef): ActorCell =
|
protected def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef): ActorCell =
|
||||||
new ActorCell(system, ref, props, supervisor)
|
new ActorCell(system, ref, props, supervisor)
|
||||||
|
|
@ -279,6 +283,11 @@ private[akka] class LocalActorRef private[akka] (
|
||||||
*/
|
*/
|
||||||
override def isTerminated: Boolean = actorCell.isTerminated
|
override def isTerminated: Boolean = actorCell.isTerminated
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts the actor after initialization.
|
||||||
|
*/
|
||||||
|
override def start(): Unit = actorCell.start()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Suspends the actor so that it will not process messages until resumed. The
|
* Suspends the actor so that it will not process messages until resumed. The
|
||||||
* suspend request is processed asynchronously to the caller of this method
|
* suspend request is processed asynchronously to the caller of this method
|
||||||
|
|
@ -390,6 +399,7 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef {
|
||||||
override def getParent: InternalActorRef = Nobody
|
override def getParent: InternalActorRef = Nobody
|
||||||
override def getChild(names: Iterator[String]): InternalActorRef = if (names.forall(_.isEmpty)) this else Nobody
|
override def getChild(names: Iterator[String]): InternalActorRef = if (names.forall(_.isEmpty)) this else Nobody
|
||||||
|
|
||||||
|
override def start(): Unit = ()
|
||||||
override def suspend(): Unit = ()
|
override def suspend(): Unit = ()
|
||||||
override def resume(causedByFailure: Throwable): Unit = ()
|
override def resume(causedByFailure: Throwable): Unit = ()
|
||||||
override def stop(): Unit = ()
|
override def stop(): Unit = ()
|
||||||
|
|
|
||||||
|
|
@ -516,6 +516,7 @@ class LocalActorRefProvider(
|
||||||
cell.reserveChild("user")
|
cell.reserveChild("user")
|
||||||
val ref = new LocalActorRef(system, Props(new Guardian(guardianStrategy)), rootGuardian, rootPath / "user")
|
val ref = new LocalActorRef(system, Props(new Guardian(guardianStrategy)), rootGuardian, rootPath / "user")
|
||||||
cell.initChild(ref)
|
cell.initChild(ref)
|
||||||
|
ref.start()
|
||||||
ref
|
ref
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -524,6 +525,7 @@ class LocalActorRefProvider(
|
||||||
cell.reserveChild("system")
|
cell.reserveChild("system")
|
||||||
val ref = new LocalActorRef(system, Props(new SystemGuardian(systemGuardianStrategy)), rootGuardian, rootPath / "system")
|
val ref = new LocalActorRef(system, Props(new SystemGuardian(systemGuardianStrategy)), rootGuardian, rootPath / "system")
|
||||||
cell.initChild(ref)
|
cell.initChild(ref)
|
||||||
|
ref.start()
|
||||||
ref
|
ref
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -78,8 +78,10 @@ private[akka] class RepointableActorRef(
|
||||||
* unstarted cell. The cell must be fully functional.
|
* unstarted cell. The cell must be fully functional.
|
||||||
*/
|
*/
|
||||||
def newCell(old: Cell): Cell =
|
def newCell(old: Cell): Cell =
|
||||||
new ActorCell(system, this, props, supervisor)
|
new ActorCell(system, this, props, supervisor).
|
||||||
.start(sendSupervise = false, old.asInstanceOf[UnstartedCell].uid)
|
init(old.asInstanceOf[UnstartedCell].uid, sendSupervise = false).start()
|
||||||
|
|
||||||
|
def start(): Unit = ()
|
||||||
|
|
||||||
def suspend(): Unit = underlying.suspend()
|
def suspend(): Unit = underlying.suspend()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -192,6 +192,7 @@ private[akka] trait Children { this: ActorCell ⇒
|
||||||
// mailbox==null during RoutedActorCell constructor, where suspends are queued otherwise
|
// mailbox==null during RoutedActorCell constructor, where suspends are queued otherwise
|
||||||
if (mailbox ne null) for (_ ← 1 to mailbox.suspendCount) actor.suspend()
|
if (mailbox ne null) for (_ ← 1 to mailbox.suspendCount) actor.suspend()
|
||||||
initChild(actor)
|
initChild(actor)
|
||||||
|
actor.start()
|
||||||
actor
|
actor
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -38,12 +38,11 @@ private[akka] trait Dispatch { this: ActorCell ⇒
|
||||||
final def isTerminated: Boolean = mailbox.isClosed
|
final def isTerminated: Boolean = mailbox.isClosed
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start this cell, i.e. attach it to the dispatcher. The UID must reasonably
|
* Initialize this cell, i.e. set up mailboxes and supervision. The UID must be
|
||||||
* be different from the previous UID of a possible actor with the same path,
|
* reasonably different from the previous UID of a possible actor with the same path,
|
||||||
* which can be achieved by using ThreadLocalRandom.current.nextInt().
|
* which can be achieved by using ThreadLocalRandom.current.nextInt().
|
||||||
*/
|
*/
|
||||||
final def start(sendSupervise: Boolean, uid: Int): this.type = {
|
final def init(uid: Int, sendSupervise: Boolean): this.type = {
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Create the mailbox and enqueue the Create() message to ensure that
|
* Create the mailbox and enqueue the Create() message to ensure that
|
||||||
* this is processed before anything else.
|
* this is processed before anything else.
|
||||||
|
|
@ -59,10 +58,15 @@ private[akka] trait Dispatch { this: ActorCell ⇒
|
||||||
parent.sendSystemMessage(akka.dispatch.Supervise(self, uid))
|
parent.sendSystemMessage(akka.dispatch.Supervise(self, uid))
|
||||||
parent ! NullMessage // read ScalaDoc of NullMessage to see why
|
parent ! NullMessage // read ScalaDoc of NullMessage to see why
|
||||||
}
|
}
|
||||||
|
this
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start this cell, i.e. attach it to the dispatcher.
|
||||||
|
*/
|
||||||
|
final def start(): this.type = {
|
||||||
// This call is expected to start off the actor by scheduling its mailbox.
|
// This call is expected to start off the actor by scheduling its mailbox.
|
||||||
dispatcher.attach(this)
|
dispatcher.attach(this)
|
||||||
|
|
||||||
this
|
this
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -71,7 +71,7 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo
|
||||||
r
|
r
|
||||||
}
|
}
|
||||||
|
|
||||||
start(sendSupervise = false, _uid)
|
init(_uid, sendSupervise = false).start()
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* end of construction
|
* end of construction
|
||||||
|
|
|
||||||
|
|
@ -158,8 +158,7 @@ class RemoteActorRefProvider(
|
||||||
local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async)
|
local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async)
|
||||||
} else {
|
} else {
|
||||||
val rpath = RootActorPath(addr) / "remote" / transport.address.hostPort / path.elements
|
val rpath = RootActorPath(addr) / "remote" / transport.address.hostPort / path.elements
|
||||||
useActorOnNode(rpath, props, d, supervisor)
|
new RemoteActorRef(this, transport, rpath, supervisor, Some(props), Some(d))
|
||||||
new RemoteActorRef(this, transport, rpath, supervisor)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
case _ ⇒ local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async)
|
case _ ⇒ local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async)
|
||||||
|
|
@ -169,12 +168,12 @@ class RemoteActorRefProvider(
|
||||||
|
|
||||||
def actorFor(path: ActorPath): InternalActorRef =
|
def actorFor(path: ActorPath): InternalActorRef =
|
||||||
if (path.address == rootPath.address || path.address == transport.address) actorFor(rootGuardian, path.elements)
|
if (path.address == rootPath.address || path.address == transport.address) actorFor(rootGuardian, path.elements)
|
||||||
else new RemoteActorRef(this, transport, path, Nobody)
|
else new RemoteActorRef(this, transport, path, Nobody, props = None, deploy = None)
|
||||||
|
|
||||||
def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match {
|
def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match {
|
||||||
case ActorPathExtractor(address, elems) ⇒
|
case ActorPathExtractor(address, elems) ⇒
|
||||||
if (address == rootPath.address || address == transport.address) actorFor(rootGuardian, elems)
|
if (address == rootPath.address || address == transport.address) actorFor(rootGuardian, elems)
|
||||||
else new RemoteActorRef(this, transport, new RootActorPath(address) / elems, Nobody)
|
else new RemoteActorRef(this, transport, new RootActorPath(address) / elems, Nobody, props = None, deploy = None)
|
||||||
case _ ⇒ local.actorFor(ref, path)
|
case _ ⇒ local.actorFor(ref, path)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -213,7 +212,9 @@ private[akka] class RemoteActorRef private[akka] (
|
||||||
val provider: RemoteActorRefProvider,
|
val provider: RemoteActorRefProvider,
|
||||||
remote: RemoteTransport,
|
remote: RemoteTransport,
|
||||||
val path: ActorPath,
|
val path: ActorPath,
|
||||||
val getParent: InternalActorRef)
|
val getParent: InternalActorRef,
|
||||||
|
props: Option[Props],
|
||||||
|
deploy: Option[Deploy])
|
||||||
extends InternalActorRef with RemoteRef {
|
extends InternalActorRef with RemoteRef {
|
||||||
|
|
||||||
def getChild(name: Iterator[String]): InternalActorRef = {
|
def getChild(name: Iterator[String]): InternalActorRef = {
|
||||||
|
|
@ -221,7 +222,7 @@ private[akka] class RemoteActorRef private[akka] (
|
||||||
s.headOption match {
|
s.headOption match {
|
||||||
case None ⇒ this
|
case None ⇒ this
|
||||||
case Some("..") ⇒ getParent getChild name
|
case Some("..") ⇒ getParent getChild name
|
||||||
case _ ⇒ new RemoteActorRef(provider, remote, path / s, Nobody)
|
case _ ⇒ new RemoteActorRef(provider, remote, path / s, Nobody, props = None, deploy = None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -243,6 +244,8 @@ private[akka] class RemoteActorRef private[akka] (
|
||||||
provider.deadLetters ! message
|
provider.deadLetters ! message
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def start(): Unit = if (props.isDefined && deploy.isDefined) provider.useActorOnNode(path, props.get, deploy.get, getParent)
|
||||||
|
|
||||||
def suspend(): Unit = sendSystemMessage(Suspend())
|
def suspend(): Unit = sendSystemMessage(Suspend())
|
||||||
|
|
||||||
def resume(causedByFailure: Throwable): Unit = sendSystemMessage(Resume(causedByFailure))
|
def resume(causedByFailure: Throwable): Unit = sendSystemMessage(Resume(causedByFailure))
|
||||||
|
|
@ -253,4 +256,4 @@ private[akka] class RemoteActorRef private[akka] (
|
||||||
|
|
||||||
@throws(classOf[java.io.ObjectStreamException])
|
@throws(classOf[java.io.ObjectStreamException])
|
||||||
private def writeReplace(): AnyRef = SerializedActorRef(path)
|
private def writeReplace(): AnyRef = SerializedActorRef(path)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -42,6 +42,9 @@ class TestActorRef[T <: Actor](
|
||||||
_supervisor,
|
_supervisor,
|
||||||
_supervisor.path / name) {
|
_supervisor.path / name) {
|
||||||
|
|
||||||
|
// we need to start ourselves since the creation of an actor has been split into initialization and starting
|
||||||
|
underlying.start()
|
||||||
|
|
||||||
import TestActorRef.InternalGetActor
|
import TestActorRef.InternalGetActor
|
||||||
|
|
||||||
override def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef): ActorCell =
|
override def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef): ActorCell =
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue