Merge branch 'master' into wip-2413-pdf-adapt-code-samples-ban

This commit is contained in:
Björn Antonsson 2012-10-02 10:22:37 +02:00
commit 086e1a0363
15 changed files with 416 additions and 54 deletions

View file

@ -215,8 +215,9 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt
EventFilter[Exception]("hello", occurrences = 1) intercept {
a ! "die"
}
val t = probe.expectMsg(Terminated(a)(true))
val t = probe.expectMsg(Terminated(a)(existenceConfirmed = true, addressTerminated = false))
t.existenceConfirmed must be(true)
t.addressTerminated must be(false)
}
"shut down when /user escalates" in {

View file

@ -0,0 +1,99 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
class SwitchSpec extends WordSpec with MustMatchers {
"Switch" must {
"on and off" in {
val s = new Switch(false)
s.isOff must be(true)
s.isOn must be(false)
s.switchOn("hello") must be(true)
s.isOn must be(true)
s.isOff must be(false)
s.switchOn("hello") must be(false)
s.isOn must be(true)
s.isOff must be(false)
s.switchOff("hello") must be(true)
s.isOff must be(true)
s.isOn must be(false)
s.switchOff("hello") must be(false)
s.isOff must be(true)
s.isOn must be(false)
}
"revert when exception" in {
val s = new Switch(false)
intercept[RuntimeException] {
s.switchOn(throw new RuntimeException)
}
s.isOff must be(true)
}
"run action without locking" in {
val s = new Switch(false)
s.ifOffYield("yes") must be(Some("yes"))
s.ifOnYield("no") must be(None)
s.ifOff("yes") must be(true)
s.ifOn("no") must be(false)
s.switchOn()
s.ifOnYield("yes") must be(Some("yes"))
s.ifOffYield("no") must be(None)
s.ifOn("yes") must be(true)
s.ifOff("no") must be(false)
}
"run action with locking" in {
val s = new Switch(false)
s.whileOffYield("yes") must be(Some("yes"))
s.whileOnYield("no") must be(None)
s.whileOff("yes") must be(true)
s.whileOn("no") must be(false)
s.switchOn()
s.whileOnYield("yes") must be(Some("yes"))
s.whileOffYield("no") must be(None)
s.whileOn("yes") must be(true)
s.whileOff("no") must be(false)
}
"run first or second action depending on state" in {
val s = new Switch(false)
s.fold("on")("off") must be("off")
s.switchOn()
s.fold("on")("off") must be("on")
}
"do proper locking" in {
val s = new Switch(false)
s.locked {
Thread.sleep(500)
s.switchOn()
s.isOn must be(true)
}
val latch = new CountDownLatch(1)
new Thread {
override def run(): Unit = {
s.switchOff()
latch.countDown()
}
}.start()
latch.await(5, TimeUnit.SECONDS)
s.isOff must be(true)
}
}
}

View file

@ -65,9 +65,18 @@ case object Kill extends Kill {
* Terminated message can't be forwarded to another actor, since that actor
* might not be watching the subject. Instead, if you need to forward Terminated
* to another actor you should send the information in your own message.
*
* @param actor the watched actor that terminated
* @param existenceConfirmed is false when the Terminated message was not sent
* directly from the watched actor, but derived from another source, such as
* when watching a non-local ActorRef, which might not have been resolved
* @param addressTerminated the Terminated message was derived from
* that the remote node hosting the watched actor was detected as unreachable
*/
@SerialVersionUID(1L)
case class Terminated private[akka] (@BeanProperty actor: ActorRef)(@BeanProperty val existenceConfirmed: Boolean) extends AutoReceivedMessage
case class Terminated private[akka] (@BeanProperty actor: ActorRef)(
@BeanProperty val existenceConfirmed: Boolean,
@BeanProperty val addressTerminated: Boolean) extends AutoReceivedMessage
/**
* INTERNAL API

View file

@ -380,8 +380,10 @@ private[akka] class ActorCell(
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
msg.message match {
case Failed(cause, uid) handleFailure(sender, cause, uid)
case t: Terminated watchedActorTerminated(t)
case Failed(cause, uid) handleFailure(sender, cause, uid)
case t: Terminated
if (t.addressTerminated) removeChildWhenToAddressTerminated(t.actor)
watchedActorTerminated(t)
case AddressTerminated(address) addressTerminated(address)
case Kill throw new ActorKilledException("Kill")
case PoisonPill self.stop()
@ -391,6 +393,18 @@ private[akka] class ActorCell(
}
}
/**
* When a parent is watching a child and it terminates due to AddressTerminated,
* it should be removed to support immediate creation of child with same name.
*
* For remote deployed actors ChildTerminated should be sent to the supervisor
* to clean up child references of remote deployed actors when remote node
* goes down, i.e. triggered by AddressTerminated, but that is the responsibility
* of the ActorRefProvider to handle that scenario.
*/
private def removeChildWhenToAddressTerminated(child: ActorRef): Unit =
childrenRefs.getByRef(child) foreach { crs removeChildAndGetStateChange(crs.child) }
final def receiveMessage(msg: Any): Unit = behaviorStack.head.applyOrElse(msg, actor.unhandled)
/*

View file

@ -10,9 +10,10 @@ import java.lang.{ UnsupportedOperationException, IllegalStateException }
import akka.serialization.{ Serialization, JavaSerializer }
import akka.event.EventStream
import scala.annotation.tailrec
import java.util.concurrent.{ ConcurrentHashMap }
import java.util.concurrent.ConcurrentHashMap
import akka.event.LoggingAdapter
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.collection.JavaConverters
/**
* Immutable and serializable handle to an actor, which may or may not reside
@ -442,7 +443,7 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider,
protected def specialHandle(msg: Any): Boolean = msg match {
case w: Watch
if (w.watchee == this && w.watcher != this)
w.watcher ! Terminated(w.watchee)(existenceConfirmed = false)
w.watcher ! Terminated(w.watchee)(existenceConfirmed = false, addressTerminated = false)
true
case _: Unwatch true // Just ignore
case _ false
@ -467,7 +468,7 @@ private[akka] class DeadLetterActorRef(_provider: ActorRefProvider,
override protected def specialHandle(msg: Any): Boolean = msg match {
case w: Watch
if (w.watchee != this && w.watcher != this)
w.watcher ! Terminated(w.watchee)(existenceConfirmed = false)
w.watcher ! Terminated(w.watchee)(existenceConfirmed = false, addressTerminated = false)
true
case w: Unwatch true // Just ignore
case NullMessage true
@ -516,4 +517,11 @@ private[akka] class VirtualPathContainer(
}
}
}
def hasChildren: Boolean = !children.isEmpty
def foreachChild(f: ActorRef Unit) = {
val iter = children.values.iterator
while (iter.hasNext) f(iter.next)
}
}

View file

@ -300,6 +300,23 @@ trait ActorRefFactory {
*/
private[akka] case class StopChild(child: ActorRef)
/**
* INTERNAL API
*/
private[akka] object SystemGuardian {
/**
* For the purpose of orderly shutdown it's possible
* to register interest in the termination of systemGuardian
* and receive a notification [[akka.actor.Guardian.TerminationHook]]
* before systemGuardian is stopped. The registered hook is supposed
* to reply with [[akka.actor.Guardian.TerminationHookDone]] and the
* systemGuardian will not stop until all registered hooks have replied.
*/
case object RegisterTerminationHook
case object TerminationHook
case object TerminationHookDone
}
/**
* Local ActorRef provider.
*/
@ -373,10 +390,13 @@ class LocalActorRefProvider(
}
}
private class Guardian(override val supervisorStrategy: SupervisorStrategy, isSystem: Boolean) extends Actor {
/*
* Root and user guardian
*/
private class Guardian(override val supervisorStrategy: SupervisorStrategy) extends Actor {
def receive = {
case Terminated(_) if (isSystem) eventStream.stopDefaultLoggers(); context.stop(self)
case Terminated(_) context.stop(self)
case StopChild(child) context.stop(child)
case m deadLetters ! DeadLetter(m, sender, self)
}
@ -385,6 +405,53 @@ class LocalActorRefProvider(
override def preRestart(cause: Throwable, msg: Option[Any]) {}
}
/**
* System guardian
*/
private class SystemGuardian(override val supervisorStrategy: SupervisorStrategy) extends Actor {
import SystemGuardian._
var terminationHooks = Set.empty[ActorRef]
def receive = {
case Terminated(`guardian`)
// time for the systemGuardian to stop, but first notify all the
// termination hooks, they will reply with TerminationHookDone
// and when all are done the systemGuardian is stopped
context.become(terminating)
terminationHooks foreach { _ ! TerminationHook }
stopWhenAllTerminationHooksDone()
case Terminated(a)
// a registered, and watched termination hook terminated before
// termination process of guardian has started
terminationHooks -= a
case StopChild(child) context.stop(child)
case RegisterTerminationHook if sender != context.system.deadLetters
terminationHooks += sender
context watch sender
case m deadLetters ! DeadLetter(m, sender, self)
}
def terminating: Receive = {
case Terminated(a) stopWhenAllTerminationHooksDone(a)
case TerminationHookDone stopWhenAllTerminationHooksDone(sender)
case m deadLetters ! DeadLetter(m, sender, self)
}
def stopWhenAllTerminationHooksDone(remove: ActorRef): Unit = {
terminationHooks -= remove
stopWhenAllTerminationHooksDone()
}
def stopWhenAllTerminationHooksDone(): Unit = if (terminationHooks.isEmpty) {
eventStream.stopDefaultLoggers()
context.stop(self)
}
// guardian MUST NOT lose its children during restart
override def preRestart(cause: Throwable, msg: Option[Any]) {}
}
/*
* The problem is that ActorRefs need a reference to the ActorSystem to
* provide their service. Hence they cannot be created while the
@ -435,7 +502,7 @@ class LocalActorRefProvider(
protected def systemGuardianStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
lazy val rootGuardian: LocalActorRef =
new LocalActorRef(system, Props(new Guardian(rootGuardianStrategy, isSystem = false)), theOneWhoWalksTheBubblesOfSpaceTime, rootPath) {
new LocalActorRef(system, Props(new Guardian(rootGuardianStrategy)), theOneWhoWalksTheBubblesOfSpaceTime, rootPath) {
override def getParent: InternalActorRef = this
override def getSingleChild(name: String): InternalActorRef = name match {
case "temp" tempContainer
@ -447,7 +514,7 @@ class LocalActorRefProvider(
lazy val guardian: LocalActorRef = {
val cell = rootGuardian.underlying
cell.reserveChild("user")
val ref = new LocalActorRef(system, Props(new Guardian(guardianStrategy, isSystem = false)), rootGuardian, rootPath / "user")
val ref = new LocalActorRef(system, Props(new Guardian(guardianStrategy)), rootGuardian, rootPath / "user")
cell.initChild(ref)
ref
}
@ -455,7 +522,7 @@ class LocalActorRefProvider(
lazy val systemGuardian: LocalActorRef = {
val cell = rootGuardian.underlying
cell.reserveChild("system")
val ref = new LocalActorRef(system, Props(new Guardian(systemGuardianStrategy, isSystem = true)), rootGuardian, rootPath / "system")
val ref = new LocalActorRef(system, Props(new SystemGuardian(systemGuardianStrategy)), rootGuardian, rootPath / "system")
cell.initChild(ref)
ref
}

View file

@ -49,7 +49,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
protected def tellWatchersWeDied(actor: Actor): Unit = {
if (!watchedBy.isEmpty) {
val terminated = Terminated(self)(existenceConfirmed = true)
val terminated = Terminated(self)(existenceConfirmed = true, addressTerminated = false)
try {
watchedBy foreach {
watcher
@ -118,7 +118,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
// existenceConfirmed = false because we could have been watching a
// non-local ActorRef that had never resolved before the other node went down
for (a watching; if a.path.address == address) {
self ! Terminated(a)(existenceConfirmed = false)
self ! Terminated(a)(existenceConfirmed = false, addressTerminated = true)
}
}

View file

@ -261,7 +261,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
case _: Terminate stop()
case Watch(watchee, watcher)
if (watchee == this && watcher != this) {
if (!addWatcher(watcher)) watcher ! Terminated(watchee)(existenceConfirmed = true)
if (!addWatcher(watcher)) watcher ! Terminated(watchee)(existenceConfirmed = true, addressTerminated = false)
} else System.err.println("BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, this))
case Unwatch(watchee, watcher)
if (watchee == this && watcher != this) remWatcher(watcher)
@ -280,7 +280,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
result tryComplete Failure(new ActorKilledException("Stopped"))
val watchers = clearWatchers()
if (!watchers.isEmpty) {
val termination = Terminated(this)(existenceConfirmed = true)
val termination = Terminated(this)(existenceConfirmed = true, addressTerminated = false)
watchers foreach { w try w.tell(termination, this) catch { case NonFatal(t) /* FIXME LOG THIS */ } }
}
}

View file

@ -115,7 +115,7 @@ class Switch(startAsOn: Boolean = false) {
* Be careful of longrunning or blocking within the provided action as it can lead to deadlocks or bad performance
*/
def whileOff(action: Unit): Boolean = synchronized {
if (switch.get) {
if (!switch.get) {
action
true
} else false

View file

@ -5,19 +5,30 @@ package akka.cluster
import com.typesafe.config.Config
import akka.ConfigurationException
import akka.actor.Actor
import akka.actor.ActorPath
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.ActorSystemImpl
import akka.actor.Deploy
import akka.actor.DynamicAccess
import akka.actor.InternalActorRef
import akka.actor.NoScopeGiven
import akka.actor.Props
import akka.actor.Scheduler
import akka.actor.Scope
import akka.actor.Terminated
import akka.cluster.routing.ClusterRouterConfig
import akka.cluster.routing.ClusterRouterSettings
import akka.dispatch.ChildTerminated
import akka.event.EventStream
import akka.remote.RemoteActorRefProvider
import akka.remote.RemoteDeployer
import akka.remote.routing.RemoteRouterConfig
import akka.cluster.routing.ClusterRouterSettings
/**
* INTERNAL API
*/
class ClusterActorRefProvider(
_systemName: String,
_settings: ActorSystem.Settings,
@ -26,10 +37,55 @@ class ClusterActorRefProvider(
_dynamicAccess: DynamicAccess) extends RemoteActorRefProvider(
_systemName, _settings, _eventStream, _scheduler, _dynamicAccess) {
@volatile private var remoteDeploymentWatcher: ActorRef = _
override def init(system: ActorSystemImpl): Unit = {
super.init(system)
remoteDeploymentWatcher = system.systemActorOf(Props[RemoteDeploymentWatcher], "RemoteDeploymentWatcher")
}
override val deployer: ClusterDeployer = new ClusterDeployer(settings, dynamicAccess)
/**
* This method is overridden here to keep track of remote deployed actors to
* be able to clean up corresponding child references.
*/
override def useActorOnNode(path: ActorPath, props: Props, deploy: Deploy, supervisor: ActorRef): Unit = {
super.useActorOnNode(path, props, deploy, supervisor)
remoteDeploymentWatcher ! (actorFor(path), supervisor)
}
}
/**
* INTERNAL API
*
* Responsible for cleaning up child references of remote deployed actors when remote node
* goes down (jvm crash, network failure), i.e. triggered by [[akka.actor.AddressTerminated]].
*/
private[akka] class RemoteDeploymentWatcher extends Actor {
var supervisors = Map.empty[ActorRef, InternalActorRef]
def receive = {
case (a: ActorRef, supervisor: InternalActorRef)
supervisors += (a -> supervisor)
context.watch(a)
case t @ Terminated(a) if supervisors isDefinedAt a
// send extra ChildTerminated to the supervisor so that it will remove the child
if (t.addressTerminated) supervisors(a).sendSystemMessage(ChildTerminated(a))
supervisors -= a
case _: Terminated
}
}
/**
* INTERNAL API
*
* Deployer of cluster aware routers.
*/
private[akka] class ClusterDeployer(_settings: ActorSystem.Settings, _pm: DynamicAccess) extends RemoteDeployer(_settings, _pm) {
override def parseConfig(path: String, config: Config): Option[Deploy] = {
super.parseConfig(path, config) match {

View file

@ -14,6 +14,9 @@ import akka.actor.Address
import akka.actor.RootActorPath
import akka.actor.Terminated
import akka.actor.Address
import akka.remote.RemoteActorRef
import java.util.concurrent.TimeoutException
import akka.actor.ActorSystemImpl
object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
@ -22,6 +25,12 @@ object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig {
val fourth = role("fourth")
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
deployOn(fourth, """/hello.remote = "@first@" """)
class Hello extends Actor {
def receive = Actor.emptyBehavior
}
}
class ClusterDeathWatchMultiJvmNode1 extends ClusterDeathWatchSpec
@ -114,5 +123,41 @@ abstract class ClusterDeathWatchSpec
enterBarrier("after-3")
}
"be able to shutdown system when using remote deployed actor on node that crash" taggedAs LongRunningTest in {
runOn(fourth) {
val hello = system.actorOf(Props[Hello], "hello")
hello.isInstanceOf[RemoteActorRef] must be(true)
hello.path.address must be(address(first))
watch(hello)
enterBarrier("hello-deployed")
markNodeAsUnavailable(first)
val t = expectMsgType[Terminated]
t.actor must be(hello)
enterBarrier("first-unavailable")
system.shutdown()
val timeout = remaining
try system.awaitTermination(timeout) catch {
case _: TimeoutException
fail("Failed to stop [%s] within [%s] \n%s".format(system.name, timeout,
system.asInstanceOf[ActorSystemImpl].printTree))
}
}
runOn(first, second, third) {
enterBarrier("hello-deployed")
enterBarrier("first-unavailable")
runOn(first) {
// fourth system will be shutdown, remove to not participate in barriers any more
testConductor.removeNode(fourth)
}
enterBarrier("after-4")
}
}
}
}

View file

@ -12,7 +12,7 @@ import akka.actor._
import akka.util.Timeout
import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName }
import akka.remote.RemoteActorRefProvider
import akka.testkit.TestKit
import akka.testkit._
import scala.concurrent.{ Await, Awaitable }
import scala.util.control.NonFatal
import scala.concurrent.util.Duration
@ -246,14 +246,23 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
}
}
system.shutdown()
try system.awaitTermination(5 seconds) catch {
val shutdownTimeout = 5.seconds.dilated
try system.awaitTermination(shutdownTimeout) catch {
case _: TimeoutException
system.log.warning("Failed to stop [{}] within 5 seconds", system.name)
println(system.asInstanceOf[ActorSystemImpl].printTree)
val msg = "Failed to stop [%s] within [%s] \n%s".format(system.name, shutdownTimeout,
system.asInstanceOf[ActorSystemImpl].printTree)
if (verifySystemShutdown) throw new RuntimeException(msg)
else system.log.warning(msg)
}
atTermination()
}
/**
* Override this and return `true` to assert that the
* shutdown of the `ActorSystem` was done properly.
*/
def verifySystemShutdown: Boolean = false
/*
* Test Class Interface
*/

View file

@ -3,14 +3,17 @@
*/
package akka.remote
import language.postfixOps
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.pattern.ask
import testkit.{STMultiNodeSpec, MultiNodeConfig, MultiNodeSpec}
import testkit.{ STMultiNodeSpec, MultiNodeConfig, MultiNodeSpec }
import akka.testkit._
import akka.actor.Terminated
import scala.concurrent.util.duration._
import com.typesafe.config.ConfigFactory
object NewRemoteActorMultiJvmSpec extends MultiNodeConfig {
@ -20,12 +23,16 @@ object NewRemoteActorMultiJvmSpec extends MultiNodeConfig {
}
}
commonConfig(debugConfig(on = false))
commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString("akka.remote.log-remote-lifecycle-events = off")))
val master = role("master")
val slave = role("slave")
deployOn(master, """/service-hello.remote = "@slave@" """)
deployOn(master, """
/service-hello.remote = "@slave@"
/service-hello3.remote = "@slave@"
""")
deployOnAll("""/service-hello2.remote = "@slave@" """)
}
@ -37,7 +44,10 @@ class NewRemoteActorSpec extends MultiNodeSpec(NewRemoteActorMultiJvmSpec)
with STMultiNodeSpec with ImplicitSender with DefaultTimeout {
import NewRemoteActorMultiJvmSpec._
def initialParticipants = 2
def initialParticipants = roles.size
// ensure that system shutdown is successful
override def verifySystemShutdown = true
"A new remote actor" must {
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" taggedAs LongRunningTest in {
@ -45,14 +55,11 @@ class NewRemoteActorSpec extends MultiNodeSpec(NewRemoteActorMultiJvmSpec)
runOn(master) {
val actor = system.actorOf(Props[SomeActor], "service-hello")
actor.isInstanceOf[RemoteActorRef] must be(true)
actor.path.address must be(node(slave).address)
val slaveAddress = testConductor.getAddressFor(slave).await
actor ! "identify"
expectMsgType[ActorRef].path.address must equal(slaveAddress)
// shut down the actor before we let the other node(s) shut down so we don't try to send
// "Terminate" to a shut down node
system.stop(actor)
}
enterBarrier("done")
@ -63,17 +70,37 @@ class NewRemoteActorSpec extends MultiNodeSpec(NewRemoteActorMultiJvmSpec)
runOn(master) {
val actor = system.actorOf(Props[SomeActor], "service-hello2")
actor.isInstanceOf[RemoteActorRef] must be(true)
actor.path.address must be(node(slave).address)
val slaveAddress = testConductor.getAddressFor(slave).await
actor ! "identify"
expectMsgType[ActorRef].path.address must equal(slaveAddress)
// shut down the actor before we let the other node(s) shut down so we don't try to send
// "Terminate" to a shut down node
system.stop(actor)
}
enterBarrier("done")
}
"be able to shutdown system when using remote deployed actor" taggedAs LongRunningTest in within(10 seconds) {
runOn(master) {
val actor = system.actorOf(Props[SomeActor], "service-hello3")
actor.isInstanceOf[RemoteActorRef] must be(true)
actor.path.address must be(node(slave).address)
watch(actor)
enterBarrier("deployed")
// master system is supposed to be shutdown after slave
// this should be triggered by slave system shutdown
expectMsgPF(remaining) { case Terminated(`actor`) true }
}
runOn(slave) {
enterBarrier("deployed")
}
// Important that this is the last test.
// It must not be any barriers here.
// verifySystemShutdown = true will ensure that system shutdown is successful
}
}
}

View file

@ -5,11 +5,12 @@
package akka.remote
import scala.annotation.tailrec
import akka.actor.{ VirtualPathContainer, Terminated, Deploy, Props, Nobody, LocalActorRef, InternalActorRef, Address, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor }
import akka.actor.{ VirtualPathContainer, Terminated, Deploy, Props, Nobody, LocalActorRef, InternalActorRef, Address, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor, AddressTerminated }
import akka.event.LoggingAdapter
import akka.dispatch.Watch
import akka.actor.ActorRefWithCell
import akka.actor.ActorRefScope
import akka.util.Switch
private[akka] sealed trait DaemonMsg
private[akka] case class DaemonMsgCreate(props: Props, deploy: Deploy, path: String, supervisor: ActorRef) extends DaemonMsg
@ -24,6 +25,14 @@ private[akka] case class DaemonMsgCreate(props: Props, deploy: Deploy, path: Str
private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter)
extends VirtualPathContainer(system.provider, _path, _parent, _log) {
import akka.actor.SystemGuardian._
private val terminating = new Switch(false)
system.provider.systemGuardian.tell(RegisterTerminationHook, this)
system.eventStream.subscribe(this, classOf[AddressTerminated])
/**
* Find the longest matching path which we know about and return that ref
* (or ask that ref to continue searching if elements are left).
@ -60,21 +69,40 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath
// TODO RK canonicalize path so as not to duplicate it always #1446
val subpath = elems.drop(1)
val path = this.path / subpath
val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef],
path, systemService = false, Some(deploy), lookupDeploy = true, async = false)
addChild(subpath.mkString("/"), actor)
this.sendSystemMessage(Watch(actor, this))
val isTerminating = !terminating.whileOff {
val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef],
path, systemService = false, Some(deploy), lookupDeploy = true, async = false)
addChild(subpath.mkString("/"), actor)
actor.sendSystemMessage(Watch(actor, this))
}
if (isTerminating) log.error("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, path.address)
case _
log.error("remote path does not match path from message [{}]", message)
}
}
case Terminated(child: ActorRefWithCell) if child.asInstanceOf[ActorRefScope].isLocal
removeChild(child.path.elements.drop(1).mkString("/"))
terminating.locked {
removeChild(child.path.elements.drop(1).mkString("/"))
terminationHookDoneWhenNoChildren()
}
case t: Terminated
case unknown log.warning("Unknown message {} received by {}", unknown, this)
case TerminationHook
terminating.switchOn {
terminationHookDoneWhenNoChildren()
foreachChild { system.stop }
}
case AddressTerminated(address)
foreachChild { case a: InternalActorRef if a.getParent.path.address == address system.stop(a) }
case unknown log.warning("Unknown message {} received by {}", unknown, this)
}
def terminationHookDoneWhenNoChildren(): Unit = terminating.whileOn {
if (!hasChildren) system.provider.systemGuardian.tell(TerminationHookDone, this)
}
}

View file

@ -20,6 +20,7 @@ import akka.actor.{ DeadLetter, Address, ActorRef }
import akka.util.Switch
import scala.util.control.NonFatal
import org.jboss.netty.handler.ssl.SslHandler
import scala.concurrent.util.Deadline
/**
* This is the abstract baseclass for netty remote clients, currently there's only an
@ -106,7 +107,7 @@ private[akka] class ActiveRemoteClient private[akka] (
private[remote] var openChannels: DefaultChannelGroup = _
@volatile
private var reconnectionTimeWindowStart = 0L
private var reconnectionDeadline: Option[Deadline] = None
def notifyListeners(msg: RemoteLifeCycleEvent): Unit = netty.notifyListeners(msg)
@ -208,20 +209,18 @@ private[akka] class ActiveRemoteClient private[akka] (
log.debug("[{}] has been shut down", name)
}
private[akka] def isWithinReconnectionTimeWindow: Boolean = {
if (reconnectionTimeWindowStart == 0L) {
reconnectionTimeWindowStart = System.currentTimeMillis
private[akka] def isWithinReconnectionTimeWindow: Boolean = reconnectionDeadline match {
case None
reconnectionDeadline = Some(Deadline.now + settings.ReconnectionTimeWindow)
true
} else {
val timeLeft = (settings.ReconnectionTimeWindow.toMillis - (System.currentTimeMillis - reconnectionTimeWindowStart)) > 0
if (timeLeft)
log.info("Will try to reconnect to remote server for another [{}] milliseconds", timeLeft)
timeLeft
}
case Some(deadline)
val hasTimeLeft = deadline.hasTimeLeft
if (hasTimeLeft)
log.info("Will try to reconnect to remote server for another [{}] milliseconds", deadline.timeLeft.toMillis)
hasTimeLeft
}
private[akka] def resetReconnectionTimeWindow = reconnectionTimeWindowStart = 0L
private[akka] def resetReconnectionTimeWindow = reconnectionDeadline = None
}
@ChannelHandler.Sharable