From 4606612bd1f46b70fcca0371816ab2d78f38d307 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 15 Apr 2013 09:26:51 +0200 Subject: [PATCH] Reliable remote supervision and death watch, see #2993 * RemoteWatcher that monitors node failures, with heartbeats and failure detector * Move RemoteDeploymentWatcher from CARP to RARP * ClusterRemoteWatcher that handles cluster nodes * Update documentation * UID in Heartbeat msg to be able to quarantine, actual implementation of quarantining will be implemented in ticket 2594 --- .../cluster/ClusterActorRefProvider.scala | 64 +--- .../scala/akka/cluster/ClusterEvent.scala | 10 +- .../akka/cluster/ClusterRemoteWatcher.scala | 97 +++++ .../akka/cluster/ClusterDeathWatchSpec.scala | 76 +++- akka-docs/rst/cluster/cluster-usage-java.rst | 6 - akka-docs/rst/cluster/cluster-usage-scala.rst | 6 - akka-docs/rst/java/remoting.rst | 119 ++++-- akka-docs/rst/java/routing.rst | 3 - akka-docs/rst/scala/remoting.rst | 120 ++++-- akka-docs/rst/scala/routing.rst | 3 - .../RemoteDeploymentDeathWatchSpec.scala | 110 ++++++ .../remote/RemoteNodeDeathWatchSpec.scala | 357 ++++++++++++++++++ akka-remote/src/main/resources/reference.conf | 75 +++- .../akka/remote/RemoteActorRefProvider.scala | 66 +++- .../akka/remote/RemoteDeploymentWatcher.scala | 42 +++ .../scala/akka/remote/RemoteSettings.scala | 16 + .../scala/akka/remote/RemoteWatcher.scala | 311 +++++++++++++++ .../transport/AkkaProtocolTransport.scala | 27 +- .../scala/akka/remote/RemoteConfigSpec.scala | 22 +- .../scala/akka/remote/RemoteWatcherSpec.scala | 352 +++++++++++++++++ .../remote/transport/AkkaProtocolSpec.scala | 2 +- .../transport/AkkaProtocolStressTest.scala | 2 +- 22 files changed, 1699 insertions(+), 187 deletions(-) create mode 100644 akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala create mode 100644 akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteDeploymentDeathWatchSpec.scala create mode 100644 akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala create mode 100644 akka-remote/src/main/scala/akka/remote/RemoteDeploymentWatcher.scala create mode 100644 akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala create mode 100644 akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala index 89c29503f7..f97057f5f4 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -5,16 +5,12 @@ package akka.cluster import com.typesafe.config.Config import akka.ConfigurationException -import akka.actor.Actor -import akka.actor.ActorPath -import akka.actor.ActorRef import akka.actor.ActorSystem import akka.actor.ActorSystemImpl import akka.actor.Deploy import akka.actor.DynamicAccess import akka.actor.InternalActorRef import akka.actor.NoScopeGiven -import akka.actor.Props import akka.actor.Scheduler import akka.actor.Scope import akka.actor.Terminated @@ -34,6 +30,9 @@ import akka.cluster.routing.HeapMetricsSelector import akka.cluster.routing.SystemLoadAverageMetricsSelector import akka.cluster.routing.CpuMetricsSelector import akka.cluster.routing.MetricsSelector +import akka.dispatch.sysmsg.SystemMessage +import akka.actor.ActorRef +import akka.actor.Props /** * INTERNAL API @@ -49,15 +48,25 @@ private[akka] class ClusterActorRefProvider( _dynamicAccess: DynamicAccess) extends RemoteActorRefProvider( _systemName, _settings, _eventStream, _dynamicAccess) { - @volatile private var remoteDeploymentWatcher: ActorRef = _ - override def init(system: ActorSystemImpl): Unit = { super.init(system) // initialize/load the Cluster extension Cluster(system) + } - remoteDeploymentWatcher = system.systemActorOf(Props[RemoteDeploymentWatcher], "RemoteDeploymentWatcher") + override protected def createRemoteWatcher(system: ActorSystemImpl): ActorRef = { + // make sure Cluster extension is initialized/loaded from init thread + Cluster(system) + + import remoteSettings._ + val failureDetector = createRemoteWatcherFailureDetector(system) + system.systemActorOf(ClusterRemoteWatcher.props( + failureDetector, + heartbeatInterval = WatchHeartBeatInterval, + unreachableReaperInterval = WatchUnreachableReaperInterval, + heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter, + numberOfEndHeartbeatRequests = WatchNumberOfEndHeartbeatRequests), "remote-watcher") } /** @@ -66,47 +75,6 @@ private[akka] class ClusterActorRefProvider( */ override protected def createDeployer: ClusterDeployer = new ClusterDeployer(settings, dynamicAccess) - /** - * This method is overridden here to keep track of remote deployed actors to - * be able to clean up corresponding child references. - */ - override def useActorOnNode(ref: ActorRef, props: Props, deploy: Deploy, supervisor: ActorRef): Unit = { - super.useActorOnNode(ref, props, deploy, supervisor) - import RemoteDeploymentWatcher.WatchRemote - remoteDeploymentWatcher ! WatchRemote(ref, supervisor) - } - -} - -/** - * INTERNAL API - */ -private[akka] object RemoteDeploymentWatcher { - case class WatchRemote(actor: ActorRef, supervisor: ActorRef) -} - -/** - * INTERNAL API - * - * Responsible for cleaning up child references of remote deployed actors when remote node - * goes down (jvm crash, network failure), i.e. triggered by [[akka.actor.AddressTerminated]]. - */ -private[akka] class RemoteDeploymentWatcher extends Actor { - import RemoteDeploymentWatcher._ - var supervisors = Map.empty[ActorRef, InternalActorRef] - - def receive = { - case WatchRemote(a, supervisor: InternalActorRef) ⇒ - supervisors += (a -> supervisor) - context.watch(a) - - case t @ Terminated(a) if supervisors isDefinedAt a ⇒ - // send extra DeathWatchNotification to the supervisor so that it will remove the child - supervisors(a).sendSystemMessage(DeathWatchNotification(a, existenceConfirmed = false, addressTerminated = true)) - supervisors -= a - - case _: Terminated ⇒ - } } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index f65c4fe2be..8c4bc77b2e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -310,15 +310,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto latestGossip = newGossip // first publish the diffUnreachable between the last two gossips diffUnreachable(oldGossip, newGossip) foreach publish - diffMemberEvents(oldGossip, newGossip) foreach { event ⇒ - event match { - case MemberRemoved(m) ⇒ - publish(event) - // notify DeathWatch about downed node - publish(AddressTerminated(m.address)) - case _ ⇒ publish(event) - } - } + diffMemberEvents(oldGossip, newGossip) foreach publish diffLeader(oldGossip, newGossip) foreach publish diffRolesLeader(oldGossip, newGossip) foreach publish // publish internal SeenState for testing purposes diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala new file mode 100644 index 0000000000..fd0d6fee96 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala @@ -0,0 +1,97 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.cluster + +import scala.concurrent.duration.FiniteDuration +import akka.actor.Actor +import akka.actor.Address +import akka.actor.Props +import akka.cluster.ClusterEvent.CurrentClusterState +import akka.cluster.ClusterEvent.MemberEvent +import akka.cluster.ClusterEvent.MemberUp +import akka.cluster.ClusterEvent.MemberRemoved +import akka.remote.FailureDetectorRegistry +import akka.remote.RemoteWatcher + +private[cluster] object ClusterRemoteWatcher { + /** + * Factory method for `ClusterRemoteWatcher` [[akka.actor.Props]]. + */ + def props( + failureDetector: FailureDetectorRegistry[Address], + heartbeatInterval: FiniteDuration, + unreachableReaperInterval: FiniteDuration, + heartbeatExpectedResponseAfter: FiniteDuration, + numberOfEndHeartbeatRequests: Int): Props = + Props(classOf[ClusterRemoteWatcher], failureDetector, heartbeatInterval, unreachableReaperInterval, + heartbeatExpectedResponseAfter, numberOfEndHeartbeatRequests) +} + +/** + * INTERNAL API + * + * Specialization of [[akka.remote.RemoteWatcher]] that keeps + * track of cluster member nodes and is responsible for watchees on cluster nodes. + * [[akka.actor.AddressTerminate]] is published when node is removed from cluster. + * + * `RemoteWatcher` handles non-cluster nodes. `ClusterRemoteWatcher` will take + * over responsibility from `RemoteWatcher` if a watch is added before a node is member + * of the cluster and then later becomes cluster member. + */ +private[cluster] class ClusterRemoteWatcher( + failureDetector: FailureDetectorRegistry[Address], + heartbeatInterval: FiniteDuration, + unreachableReaperInterval: FiniteDuration, + heartbeatExpectedResponseAfter: FiniteDuration, + numberOfEndHeartbeatRequests: Int) + extends RemoteWatcher( + failureDetector, + heartbeatInterval, + unreachableReaperInterval, + heartbeatExpectedResponseAfter, + numberOfEndHeartbeatRequests) { + + import RemoteWatcher._ + + var clusterNodes: Set[Address] = Set.empty + + override def preStart(): Unit = { + super.preStart() + Cluster(context.system).subscribe(self, classOf[MemberEvent]) + } + + override def postStop(): Unit = { + super.postStop() + Cluster(context.system).unsubscribe(self) + } + + override def receive = receiveClusterEvent orElse super.receive + + def receiveClusterEvent: Actor.Receive = { + case WatchRemote(watchee, watcher) if clusterNodes(watchee.path.address) ⇒ + () // cluster managed node, don't propagate to super + case state: CurrentClusterState ⇒ + clusterNodes = state.members.map(_.address) + clusterNodes foreach takeOverResponsibility + case MemberUp(m) ⇒ + clusterNodes += m.address + takeOverResponsibility(m.address) + case MemberRemoved(m) ⇒ + clusterNodes -= m.address + publishAddressTerminated(m.address) + } + + /** + * When a cluster node is added this class takes over the + * responsibility for watchees on that node already handled + * by super RemoteWatcher. + */ + def takeOverResponsibility(address: Address): Unit = { + watching foreach { + case (watchee, watcher) ⇒ if (watchee.path.address == address) + unwatchRemote(watchee, watcher) + } + } + +} \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala index b4d3430de0..71d7dad389 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala @@ -22,12 +22,15 @@ import java.util.concurrent.TimeoutException import akka.actor.ActorSystemImpl import akka.actor.ActorIdentity import akka.actor.Identify +import akka.actor.ActorRef +import akka.remote.RemoteWatcher object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") val fourth = role("fourth") + val fifth = role("fifth") commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) @@ -42,10 +45,11 @@ class ClusterDeathWatchMultiJvmNode1 extends ClusterDeathWatchSpec class ClusterDeathWatchMultiJvmNode2 extends ClusterDeathWatchSpec class ClusterDeathWatchMultiJvmNode3 extends ClusterDeathWatchSpec class ClusterDeathWatchMultiJvmNode4 extends ClusterDeathWatchSpec +class ClusterDeathWatchMultiJvmNode5 extends ClusterDeathWatchSpec abstract class ClusterDeathWatchSpec extends MultiNodeSpec(ClusterDeathWatchMultiJvmSpec) - with MultiNodeClusterSpec { + with MultiNodeClusterSpec with ImplicitSender { import ClusterDeathWatchMultiJvmSpec._ @@ -57,9 +61,14 @@ abstract class ClusterDeathWatchSpec } } + lazy val remoteWatcher: ActorRef = { + system.actorSelection("/system/remote-watcher") ! Identify(None) + expectMsgType[ActorIdentity].ref.get + } + "An actor watching a remote actor in the cluster" must { "receive Terminated when watched node becomes Down" taggedAs LongRunningTest in within(20 seconds) { - awaitClusterUp(roles: _*) + awaitClusterUp(first, second, third, fourth) enterBarrier("cluster-up") runOn(first) { @@ -116,6 +125,13 @@ abstract class ClusterDeathWatchSpec enterBarrier("third-terminated") } + runOn(fifth) { + enterBarrier("subjected-started") + enterBarrier("watch-established") + enterBarrier("second-terminated") + enterBarrier("third-terminated") + } + enterBarrier("after-1") } @@ -136,6 +152,58 @@ abstract class ClusterDeathWatchSpec enterBarrier("after-2") } + "be able to watch actor before node joins cluster, ClusterRemoteWatcher takes over from RemoteWatcher" taggedAs LongRunningTest in within(20 seconds) { + runOn(fifth) { + system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }), name = "subject5") + } + enterBarrier("subjected-started") + + runOn(first) { + system.actorSelection(RootActorPath(fifth) / "user" / "subject5") ! Identify("subject5") + val subject5 = expectMsgType[ActorIdentity].ref.get + watch(subject5) + + // fifth is not cluster member, so the watch is handled by the RemoteWatcher + awaitAssert { + remoteWatcher ! RemoteWatcher.Stats + expectMsgType[RemoteWatcher.Stats].watchingRefs must contain((subject5, testActor)) + } + } + enterBarrier("remote-watch") + + // second and third are already removed + awaitClusterUp(first, fourth, fifth) + + runOn(first) { + // fifth is member, so the watch is handled by the ClusterRemoteWatcher, + // and cleaned up from RemoteWatcher + awaitAssert { + remoteWatcher ! RemoteWatcher.Stats + expectMsgType[RemoteWatcher.Stats].watchingRefs.map { + case (watchee, watcher) ⇒ watchee.path.name + } must not contain ("subject5") + } + } + + enterBarrier("cluster-watch") + + runOn(fourth) { + markNodeAsUnavailable(fifth) + awaitAssert(clusterView.members.map(_.address) must not contain (address(fifth))) + awaitAssert(clusterView.unreachableMembers.map(_.address) must contain(address(fifth))) + cluster.down(fifth) + // removed + awaitAssert(clusterView.unreachableMembers.map(_.address) must not contain (address(fifth))) + } + + enterBarrier("fifth-terminated") + runOn(first) { + expectMsgType[Terminated].actor.path.name must be("subject5") + } + + enterBarrier("after-3") + } + "be able to shutdown system when using remote deployed actor on node that crash" taggedAs LongRunningTest in within(20 seconds) { runOn(fourth) { val hello = system.actorOf(Props[Hello], "hello") @@ -164,7 +232,7 @@ abstract class ClusterDeathWatchSpec } } - runOn(first, second, third) { + runOn(first, second, third, fifth) { enterBarrier("hello-deployed") enterBarrier("first-unavailable") runOn(first) { @@ -172,7 +240,7 @@ abstract class ClusterDeathWatchSpec testConductor.removeNode(fourth) } - enterBarrier("after-3") + enterBarrier("after-4") } } diff --git a/akka-docs/rst/cluster/cluster-usage-java.rst b/akka-docs/rst/cluster/cluster-usage-java.rst index 9d924e79d1..e4621930c2 100644 --- a/akka-docs/rst/cluster/cluster-usage-java.rst +++ b/akka-docs/rst/cluster/cluster-usage-java.rst @@ -345,12 +345,6 @@ Death watch uses the cluster failure detector for nodes in the cluster, i.e. it generates ``Terminated`` message from network failures and JVM crashes, in addition to graceful termination of watched actor. -.. warning:: - - Creating a remote deployed child actor with the same name as the terminated - actor is not fully supported. There is a race condition that potentially removes the new - actor. - Cluster Aware Routers ^^^^^^^^^^^^^^^^^^^^^ diff --git a/akka-docs/rst/cluster/cluster-usage-scala.rst b/akka-docs/rst/cluster/cluster-usage-scala.rst index 0da45fb27a..3c3be1a4f5 100644 --- a/akka-docs/rst/cluster/cluster-usage-scala.rst +++ b/akka-docs/rst/cluster/cluster-usage-scala.rst @@ -334,12 +334,6 @@ Death watch uses the cluster failure detector for nodes in the cluster, i.e. it generates ``Terminated`` message from network failures and JVM crashes, in addition to graceful termination of watched actor. -.. warning:: - - Creating a remote deployed child actor with the same name as the terminated - actor is not fully supported. There is a race condition that potentially removes the new - actor. - .. _cluster_aware_routers_scala: Cluster Aware Routers diff --git a/akka-docs/rst/java/remoting.rst b/akka-docs/rst/java/remoting.rst index 17286eb4ee..8c2b004f3d 100644 --- a/akka-docs/rst/java/remoting.rst +++ b/akka-docs/rst/java/remoting.rst @@ -48,24 +48,8 @@ As you can see in the example above there are four things you need to add to get systems have different names. This is because each actor system has its own network subsystem listening for connections and handling messages as not to interfere with other actor systems. -.. _remoting-java-configuration: - -Remote Configuration -^^^^^^^^^^^^^^^^^^^^ - The example above only illustrates the bare minimum of properties you have to add to enable remoting. -There are lots of more properties that are related to remoting in Akka. We refer to the following -reference file for more information: - -.. literalinclude:: ../../../akka-remote/src/main/resources/reference.conf - :language: none - -.. note:: - - Setting properties like the listening IP and port number programmatically is - best done by using something like the following: - - .. includecode:: code/docs/remoting/RemoteDeploymentDocTestBase.java#programmatic +All settings are described in :ref:`remote-configuration-java`. Looking up Remote Actors ^^^^^^^^^^^^^^^^^^^^^^^^ @@ -138,22 +122,6 @@ actor systems has to have a JAR containing the class. ``/foo/bar`` is considered **more specific** than ``/foo/*`` and only the highest priority match is used. Please note that it **cannot** be used to partially match section, like this: ``/foo*/bar``, ``/f*o/bar`` etc. -.. _remote-deployment-warnings-java: - -.. warning:: - - *Caveat:* Remote deployment ties both systems together in a tight fashion, - where it may become impossible to shut down one system after the other has - become unreachable. This is due to a missing feature—which will be part of - the clustering support—that hooks up network failure detection with - DeathWatch. If you want to avoid this strong coupling, do not remote-deploy - but send ``Props`` to a remotely looked-up actor and have that create a - child, returning the resulting actor reference. - -.. warning:: - - *Caveat:* Akka Remoting does not trigger Death Watch for lost connections. - Programmatic Remote Deployment ------------------------------ @@ -175,6 +143,72 @@ you can advise the system to create a child on that remote node like so: .. includecode:: code/docs/remoting/RemoteDeploymentDocTestBase.java#deploy +Watching Remote Actors +^^^^^^^^^^^^^^^^^^^^^^ + +Watching a remote actor is not different than watching a local actor, as described in +:ref:`deathwatch-java`. + +.. warning:: + + *Caveat:* Watching an ``ActorRef`` acquired with ``actorFor`` does not trigger + ``Terminated`` for lost connections. ``actorFor`` is deprecated in favor of + ``actorSelection``. Acquire the ``ActorRef`` to watch with ``Identify`` and + ``ActorIdentity`` as described in :ref:`actorSelection-java`. + +Failure Detector +---------------- + +Under the hood remote death watch uses heartbeat messages and a failure detector to generate ``Terminated`` +message from network failures and JVM crashes, in addition to graceful termination of watched +actor. + +The heartbeat arrival times is interpreted by an implementation of +`The Phi Accrual Failure Detector `_. + +The suspicion level of failure is given by a value called *phi*. +The basic idea of the phi failure detector is to express the value of *phi* on a scale that +is dynamically adjusted to reflect current network conditions. + +The value of *phi* is calculated as:: + + phi = -log10(1 - F(timeSinceLastHeartbeat)) + +where F is the cumulative distribution function of a normal distribution with mean +and standard deviation estimated from historical heartbeat inter-arrival times. + +In the :ref:`remote-configuration-java` you can adjust the ``akka.remote.watch-failure-detector.threshold`` +to define when a *phi* value is considered to be a failure. + +A low ``threshold`` is prone to generate many false positives but ensures +a quick detection in the event of a real crash. Conversely, a high ``threshold`` +generates fewer mistakes but needs more time to detect actual crashes. The +default ``threshold`` is 10 and is appropriate for most situations. However in +cloud environments, such as Amazon EC2, the value could be increased to 12 in +order to account for network issues that sometimes occur on such platforms. + +The following chart illustrates how *phi* increase with increasing time since the +previous heartbeat. + +.. image:: ../cluster/images/phi1.png + +Phi is calculated from the mean and standard deviation of historical +inter arrival times. The previous chart is an example for standard deviation +of 200 ms. If the heartbeats arrive with less deviation the curve becomes steeper, +i.e. it is possible to determine failure more quickly. The curve looks like this for +a standard deviation of 100 ms. + +.. image:: ../cluster/images/phi2.png + +To be able to survive sudden abnormalities, such as garbage collection pauses and +transient network failures the failure detector is configured with a margin, +``akka.remote.watch-failure-detector.acceptable-heartbeat-pause``. You may want to +adjust the :ref:`remote-configuration-java` of this depending on you environment. +This is how the curve looks like for ``acceptable-heartbeat-pause`` configured to +3 seconds. + +.. image:: ../cluster/images/phi3.png + Serialization ^^^^^^^^^^^^^ @@ -487,7 +521,7 @@ SSL SSL can be used as the remote transport by adding ``akka.remote.netty.ssl`` to the ``enabled-transport`` configuration section. See a description of the settings -in the :ref:`remoting-java-configuration` section. +in the :ref:`remote-configuration-java` section. The SSL support is implemented with Java Secure Socket Extension, please consult the offical `Java Secure Socket Extension documentation `_ @@ -500,3 +534,20 @@ and related resources for troubleshooting. Use '/dev/./urandom', not '/dev/urandom' as that doesn't work according to `Bug ID: 6202721 `_. +.. _remote-configuration-java: + +Remote Configuration +^^^^^^^^^^^^^^^^^^^^ + +There are lots of configuration properties that are related to remoting in Akka. We refer to the following +reference file for more information: + +.. literalinclude:: ../../../akka-remote/src/main/resources/reference.conf + :language: none + +.. note:: + + Setting properties like the listening IP and port number programmatically is + best done by using something like the following: + + .. includecode:: code/docs/remoting/RemoteDeploymentDocTestBase.java#programmatic diff --git a/akka-docs/rst/java/routing.rst b/akka-docs/rst/java/routing.rst index d14f57a706..1689acada8 100644 --- a/akka-docs/rst/java/routing.rst +++ b/akka-docs/rst/java/routing.rst @@ -89,9 +89,6 @@ There are a few gotchas to be aware of when creating routers: :class:`Props`, as it does not need to create routees. However, if you use a :ref:`resizable router ` then the routee :class:`Props` will be used whenever the resizer creates new routees. -* The same issues that apply to remotely-deployed actors also apply to remotely-deployed routees. - Read about :ref:`the limitations of remote deployment ` for - more information. Routers, Routees and Senders **************************** diff --git a/akka-docs/rst/scala/remoting.rst b/akka-docs/rst/scala/remoting.rst index 72811f531d..dd4f945db5 100644 --- a/akka-docs/rst/scala/remoting.rst +++ b/akka-docs/rst/scala/remoting.rst @@ -45,24 +45,8 @@ As you can see in the example above there are four things you need to add to get systems have different names. This is because each actor system has its own networking subsystem listening for connections and handling messages as not to interfere with other actor systems. -.. _remoting-scala-configuration: - -Remote Configuration -^^^^^^^^^^^^^^^^^^^^ - The example above only illustrates the bare minimum of properties you have to add to enable remoting. -There are lots of more properties that are related to remoting in Akka. We refer to the following -reference file for more information: - -.. literalinclude:: ../../../akka-remote/src/main/resources/reference.conf - :language: none - -.. note:: - - Setting properties like the listening IP and port number programmatically is - best done by using something like the following: - - .. includecode:: ../java/code/docs/remoting/RemoteDeploymentDocTestBase.java#programmatic +All settings are described in :ref:`remote-configuration-scala`. Types of Remote Interaction ^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -145,22 +129,6 @@ actor systems has to have a JAR containing the class. ``/foo/bar`` is considered **more specific** than ``/foo/*`` and only the highest priority match is used. Please note that it **cannot** be used to partially match section, like this: ``/foo*/bar``, ``/f*o/bar`` etc. -.. _remote-deployment-warnings-scala: - -.. warning:: - - *Caveat:* Remote deployment ties both systems together in a tight fashion, - where it may become impossible to shut down one system after the other has - become unreachable. This is due to a missing feature—which will be part of - the clustering support—that hooks up network failure detection with - DeathWatch. If you want to avoid this strong coupling, do not remote-deploy - but send ``Props`` to a remotely looked-up actor and have that create a - child, returning the resulting actor reference. - -.. warning:: - - *Caveat:* Akka Remoting does not trigger Death Watch for lost connections. - Programmatic Remote Deployment ------------------------------ @@ -182,6 +150,72 @@ you can advise the system to create a child on that remote node like so: .. includecode:: code/docs/remoting/RemoteDeploymentDocSpec.scala#deploy +Watching Remote Actors +^^^^^^^^^^^^^^^^^^^^^^ + +Watching a remote actor is not different than watching a local actor, as described in +:ref:`deathwatch-scala`. + +.. warning:: + + *Caveat:* Watching an ``ActorRef`` acquired with ``actorFor`` does not trigger + ``Terminated`` for lost connections. ``actorFor`` is deprecated in favor of + ``actorSelection``. Acquire the ``ActorRef`` to watch with ``Identify`` and + ``ActorIdentity`` as described in :ref:`actorSelection-scala`. + +Failure Detector +---------------- + +Under the hood remote death watch uses heartbeat messages and a failure detector to generate ``Terminated`` +message from network failures and JVM crashes, in addition to graceful termination of watched +actor. + +The heartbeat arrival times is interpreted by an implementation of +`The Phi Accrual Failure Detector `_. + +The suspicion level of failure is given by a value called *phi*. +The basic idea of the phi failure detector is to express the value of *phi* on a scale that +is dynamically adjusted to reflect current network conditions. + +The value of *phi* is calculated as:: + + phi = -log10(1 - F(timeSinceLastHeartbeat)) + +where F is the cumulative distribution function of a normal distribution with mean +and standard deviation estimated from historical heartbeat inter-arrival times. + +In the :ref:`remote-configuration-scala` you can adjust the ``akka.remote.watch-failure-detector.threshold`` +to define when a *phi* value is considered to be a failure. + +A low ``threshold`` is prone to generate many false positives but ensures +a quick detection in the event of a real crash. Conversely, a high ``threshold`` +generates fewer mistakes but needs more time to detect actual crashes. The +default ``threshold`` is 10 and is appropriate for most situations. However in +cloud environments, such as Amazon EC2, the value could be increased to 12 in +order to account for network issues that sometimes occur on such platforms. + +The following chart illustrates how *phi* increase with increasing time since the +previous heartbeat. + +.. image:: ../cluster/images/phi1.png + +Phi is calculated from the mean and standard deviation of historical +inter arrival times. The previous chart is an example for standard deviation +of 200 ms. If the heartbeats arrive with less deviation the curve becomes steeper, +i.e. it is possible to determine failure more quickly. The curve looks like this for +a standard deviation of 100 ms. + +.. image:: ../cluster/images/phi2.png + +To be able to survive sudden abnormalities, such as garbage collection pauses and +transient network failures the failure detector is configured with a margin, +``akka.remote.watch-failure-detector.acceptable-heartbeat-pause``. You may want to +adjust the :ref:`remote-configuration-scala` of this depending on you environment. +This is how the curve looks like for ``acceptable-heartbeat-pause`` configured to +3 seconds. + +.. image:: ../cluster/images/phi3.png + Serialization ^^^^^^^^^^^^^ @@ -488,7 +522,7 @@ SSL SSL can be used as the remote transport by adding ``akka.remote.netty.ssl`` to the ``enabled-transport`` configuration section. See a description of the settings -in the :ref:`remoting-scala-configuration` section. +in the :ref:`remote-configuration-scala` section. The SSL support is implemented with Java Secure Socket Extension, please consult the offical `Java Secure Socket Extension documentation `_ @@ -501,3 +535,21 @@ and related resources for troubleshooting. Use '/dev/./urandom', not '/dev/urandom' as that doesn't work according to `Bug ID: 6202721 `_. +.. _remote-configuration-scala: + +Remote Configuration +^^^^^^^^^^^^^^^^^^^^ + +There are lots of configuration properties that are related to remoting in Akka. We refer to the following +reference file for more information: + +.. literalinclude:: ../../../akka-remote/src/main/resources/reference.conf + :language: none + +.. note:: + + Setting properties like the listening IP and port number programmatically is + best done by using something like the following: + + .. includecode:: ../java/code/docs/remoting/RemoteDeploymentDocTestBase.java#programmatic + diff --git a/akka-docs/rst/scala/routing.rst b/akka-docs/rst/scala/routing.rst index 8f224c460e..e55d6f7c46 100644 --- a/akka-docs/rst/scala/routing.rst +++ b/akka-docs/rst/scala/routing.rst @@ -89,9 +89,6 @@ There are a few gotchas to be aware of when creating routers: :class:`Props`, as it does not need to create routees. However, if you use a :ref:`resizable router ` then the routee :class:`Props` will be used whenever the resizer creates new routees. -* The same issues that apply to remotely-deployed actors also apply to remotely-deployed routees. - Read about :ref:`the limitations of remote deployment ` for - more information. Routers, Routees and Senders **************************** diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteDeploymentDeathWatchSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteDeploymentDeathWatchSpec.scala new file mode 100644 index 0000000000..db2965e346 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteDeploymentDeathWatchSpec.scala @@ -0,0 +1,110 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.remote + +import language.postfixOps +import java.util.concurrent.TimeoutException +import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory +import akka.actor.Actor +import akka.actor.ActorSystemImpl +import akka.actor.Props +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import akka.testkit.TestEvent._ + +object RemoteDeploymentDeathWatchMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.remote.log-remote-lifecycle-events = off + """))) + + deployOn(second, """/hello.remote = "@third@" """) + + class Hello extends Actor { + def receive = Actor.emptyBehavior + } +} + +// Several different variations of the test + +class RemoteDeploymentDeathWatchFastMultiJvmNode1 extends RemoteDeploymentNodeDeathWatchFastSpec +class RemoteDeploymentDeathWatchFastMultiJvmNode2 extends RemoteDeploymentNodeDeathWatchFastSpec +class RemoteDeploymentDeathWatchFastMultiJvmNode3 extends RemoteDeploymentNodeDeathWatchFastSpec +abstract class RemoteDeploymentNodeDeathWatchFastSpec extends RemoteDeploymentDeathWatchSpec { + override def scenario = "fast" +} + +class RemoteDeploymentDeathWatchSlowMultiJvmNode1 extends RemoteDeploymentNodeDeathWatchSlowSpec +class RemoteDeploymentDeathWatchSlowMultiJvmNode2 extends RemoteDeploymentNodeDeathWatchSlowSpec +class RemoteDeploymentDeathWatchSlowMultiJvmNode3 extends RemoteDeploymentNodeDeathWatchSlowSpec +abstract class RemoteDeploymentNodeDeathWatchSlowSpec extends RemoteDeploymentDeathWatchSpec { + override def scenario = "slow" + override def sleep(): Unit = Thread.sleep(3000) +} + +abstract class RemoteDeploymentDeathWatchSpec + extends MultiNodeSpec(RemoteDeploymentDeathWatchMultiJvmSpec) + with STMultiNodeSpec with ImplicitSender { + + import RemoteDeploymentDeathWatchMultiJvmSpec._ + + def scenario: String + // Possible to override to let them heartbeat for a while. + def sleep(): Unit = () + + override def initialParticipants = roles.size + + "An actor system that deploys actors on another node" must { + + "be able to shutdown when remote node crash" taggedAs LongRunningTest in within(20 seconds) { + runOn(second) { + // remote deployment to third + val hello = system.actorOf(Props[Hello], "hello") + hello.path.address must be(node(third).address) + enterBarrier("hello-deployed") + + enterBarrier("third-crashed") + + sleep() + // if the remote deployed actor is not removed the system will not shutdown + system.shutdown() + val timeout = remaining + try system.awaitTermination(timeout) catch { + case _: TimeoutException ⇒ + fail("Failed to stop [%s] within [%s] \n%s".format(system.name, timeout, + system.asInstanceOf[ActorSystemImpl].printTree)) + } + } + + runOn(third) { + enterBarrier("hello-deployed") + enterBarrier("third-crashed") + } + + runOn(first) { + enterBarrier("hello-deployed") + sleep() + testConductor.shutdown(third, 0).await + enterBarrier("third-crashed") + + runOn(first) { + // second system will be shutdown, remove to not participate in barriers any more + testConductor.removeNode(second) + } + + enterBarrier("after-3") + } + + } + + } +} diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala new file mode 100644 index 0000000000..1cfe6a497c --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala @@ -0,0 +1,357 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.remote + +import language.postfixOps +import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory +import akka.actor.Actor +import akka.actor.ActorIdentity +import akka.actor.ActorRef +import akka.actor.Identify +import akka.actor.PoisonPill +import akka.actor.Props +import akka.actor.Terminated +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ + +object RemoteNodeDeathWatchMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.remote.log-remote-lifecycle-events = off + """))) + + case class WatchIt(watchee: ActorRef) + case class UnwatchIt(watchee: ActorRef) + case object Ack + + /** + * Forwarding `Terminated` to non-watching testActor is not possible, + * and therefore the `Terminated` message is wrapped. + */ + case class WrappedTerminated(t: Terminated) + + class ProbeActor(testActor: ActorRef) extends Actor { + def receive = { + case WatchIt(watchee) ⇒ + context watch watchee + sender ! Ack + case UnwatchIt(watchee) ⇒ + context unwatch watchee + sender ! Ack + case t: Terminated ⇒ + testActor forward WrappedTerminated(t) + case msg ⇒ testActor forward msg + } + } + +} + +// Several different variations of the test + +class RemoteNodeDeathWatchFastMultiJvmNode1 extends RemoteNodeDeathWatchFastSpec +class RemoteNodeDeathWatchFastMultiJvmNode2 extends RemoteNodeDeathWatchFastSpec +abstract class RemoteNodeDeathWatchFastSpec extends RemoteNodeDeathWatchSpec { + override def scenario = "fast" +} + +class RemoteNodeDeathWatchSlowMultiJvmNode1 extends RemoteNodeDeathWatchSlowSpec +class RemoteNodeDeathWatchSlowMultiJvmNode2 extends RemoteNodeDeathWatchSlowSpec +abstract class RemoteNodeDeathWatchSlowSpec extends RemoteNodeDeathWatchSpec { + override def scenario = "slow" + override def sleep(): Unit = Thread.sleep(3000) +} + +abstract class RemoteNodeDeathWatchSpec + extends MultiNodeSpec(RemoteNodeDeathWatchMultiJvmSpec) + with STMultiNodeSpec with ImplicitSender { + + import RemoteNodeDeathWatchMultiJvmSpec._ + import RemoteWatcher._ + + def scenario: String + // Possible to override to let them heartbeat for a while. + def sleep(): Unit = () + + override def initialParticipants = roles.size + + lazy val remoteWatcher: ActorRef = { + system.actorSelection("/system/remote-watcher") ! Identify(None) + expectMsgType[ActorIdentity].ref.get + } + + def identify(role: RoleName, actorName: String): ActorRef = { + system.actorSelection(node(role) / "user" / actorName) ! Identify(actorName) + expectMsgType[ActorIdentity].ref.get + } + + def assertCleanup(): Unit = { + awaitAssert { + remoteWatcher ! Stats + expectMsg(Stats.empty) + } + } + + "RemoteNodeDeathWatch (" + scenario + ")" must { + + "receive Terminated when remote actor is stopped" taggedAs LongRunningTest in { + runOn(first) { + val watcher = system.actorOf(Props(classOf[ProbeActor], testActor), "watcher1") + enterBarrier("actors-started-1") + + val subject = identify(second, "subject1") + watcher ! WatchIt(subject) + expectMsg(1 second, Ack) + subject ! "hello1" + enterBarrier("watch-established-1") + + sleep() + expectMsgType[WrappedTerminated].t.actor must be(subject) + } + + runOn(second) { + val subject = system.actorOf(Props(classOf[ProbeActor], testActor), "subject1") + enterBarrier("actors-started-1") + + expectMsg(3 seconds, "hello1") + enterBarrier("watch-established-1") + + sleep() + system.stop(subject) + } + + enterBarrier("terminated-verified-1") + + // verify that things are cleaned up, and heartbeating is stopped + assertCleanup() + expectNoMsg(2.seconds) + assertCleanup() + + enterBarrier("after-1") + } + + "cleanup after watch/unwatch" taggedAs LongRunningTest in { + runOn(first) { + val watcher = system.actorOf(Props(classOf[ProbeActor], testActor), "watcher2") + enterBarrier("actors-started-2") + + val subject = identify(second, "subject2") + watcher ! WatchIt(subject) + expectMsg(1 second, Ack) + enterBarrier("watch-2") + + sleep() + watcher ! UnwatchIt(subject) + expectMsg(1 second, Ack) + enterBarrier("unwatch-2") + } + + runOn(second) { + system.actorOf(Props(classOf[ProbeActor], testActor), "subject2") + enterBarrier("actors-started-2") + enterBarrier("watch-2") + enterBarrier("unwatch-2") + } + + // verify that things are cleaned up, and heartbeating is stopped + assertCleanup() + expectNoMsg(2.seconds) + assertCleanup() + + enterBarrier("after-2") + } + + "cleanup after bi-directional watch/unwatch" taggedAs LongRunningTest in { + val watcher = system.actorOf(Props(classOf[ProbeActor], testActor), "watcher3") + system.actorOf(Props(classOf[ProbeActor], testActor), "subject3") + enterBarrier("actors-started-3") + + val other = if (myself == first) second else first + val subject = identify(other, "subject3") + watcher ! WatchIt(subject) + expectMsg(1 second, Ack) + enterBarrier("watch-3") + + sleep() + watcher ! UnwatchIt(subject) + expectMsg(1 second, Ack) + enterBarrier("unwatch-3") + + // verify that things are cleaned up, and heartbeating is stopped + assertCleanup() + expectNoMsg(2.seconds) + assertCleanup() + + enterBarrier("after-3") + } + + "cleanup after bi-directional watch/stop/unwatch" taggedAs LongRunningTest in { + val watcher1 = system.actorOf(Props(classOf[ProbeActor], testActor), "w1") + val watcher2 = system.actorOf(Props(classOf[ProbeActor], testActor), "w2") + system.actorOf(Props(classOf[ProbeActor], testActor), "s1") + val s2 = system.actorOf(Props(classOf[ProbeActor], testActor), "s2") + enterBarrier("actors-started-3") + + val other = if (myself == first) second else first + val subject1 = identify(other, "s1") + val subject2 = identify(other, "s2") + watcher1 ! WatchIt(subject1) + expectMsg(1 second, Ack) + watcher2 ! WatchIt(subject2) + expectMsg(1 second, Ack) + enterBarrier("watch-4") + + sleep() + watcher1 ! UnwatchIt(subject1) + expectMsg(1 second, Ack) + system.stop(s2) + enterBarrier("unwatch-stop-4") + + expectMsgType[WrappedTerminated].t.actor must be(subject2) + + // verify that things are cleaned up, and heartbeating is stopped + assertCleanup() + expectNoMsg(2.seconds) + assertCleanup() + + enterBarrier("after-4") + } + + "cleanup after stop" taggedAs LongRunningTest in { + runOn(first) { + val p1, p2, p3 = TestProbe() + val a1 = system.actorOf(Props(classOf[ProbeActor], p1.ref), "a1") + val a2 = system.actorOf(Props(classOf[ProbeActor], p2.ref), "a2") + val a3 = system.actorOf(Props(classOf[ProbeActor], p3.ref), "a3") + enterBarrier("actors-started-5") + + val b1 = identify(second, "b1") + val b2 = identify(second, "b2") + val b3 = identify(second, "b3") + + a1 ! WatchIt(b1) + expectMsg(1 second, Ack) + a1 ! WatchIt(b2) + expectMsg(1 second, Ack) + a2 ! WatchIt(b2) + expectMsg(1 second, Ack) + a3 ! WatchIt(b3) + expectMsg(1 second, Ack) + sleep() + a2 ! UnwatchIt(b2) + expectMsg(1 second, Ack) + + enterBarrier("watch-established-5") + + sleep() + a1 ! PoisonPill + a2 ! PoisonPill + a3 ! PoisonPill + + enterBarrier("stopped-5") + enterBarrier("terminated-verified-5") + + // verify that things are cleaned up, and heartbeating is stopped + assertCleanup() + expectNoMsg(2.seconds) + assertCleanup() + } + + runOn(second) { + val p1, p2, p3 = TestProbe() + val b1 = system.actorOf(Props(classOf[ProbeActor], p1.ref), "b1") + val b2 = system.actorOf(Props(classOf[ProbeActor], p2.ref), "b2") + val b3 = system.actorOf(Props(classOf[ProbeActor], p3.ref), "b3") + enterBarrier("actors-started-5") + + val a1 = identify(first, "a1") + val a2 = identify(first, "a2") + val a3 = identify(first, "a3") + + b1 ! WatchIt(a1) + expectMsg(1 second, Ack) + b1 ! WatchIt(a2) + expectMsg(1 second, Ack) + b2 ! WatchIt(a2) + expectMsg(1 second, Ack) + b3 ! WatchIt(a3) + expectMsg(1 second, Ack) + b3 ! WatchIt(a3) + expectMsg(1 second, Ack) + sleep() + b2 ! UnwatchIt(a2) + expectMsg(1 second, Ack) + + enterBarrier("watch-established-5") + enterBarrier("stopped-5") + + p1.receiveN(2, 5 seconds).collect { case WrappedTerminated(t) ⇒ t.actor }.toSet must be(Set(a1, a2)) + p3.expectMsgType[WrappedTerminated](5 seconds).t.actor must be(a3) + p2.expectNoMsg(2 seconds) + enterBarrier("terminated-verified-5") + + // verify that things are cleaned up, and heartbeating is stopped + assertCleanup() + expectNoMsg(2.seconds) + p1.expectNoMsg(100 millis) + p2.expectNoMsg(100 millis) + p3.expectNoMsg(100 millis) + assertCleanup() + } + + enterBarrier("after-5") + } + + "receive Terminated when watched node is shutdown" taggedAs LongRunningTest in { + runOn(first) { + val watcher = system.actorOf(Props(classOf[ProbeActor], testActor), "watcher6") + val watcher2 = system.actorOf(Props(classOf[ProbeActor], system.deadLetters)) + enterBarrier("actors-started-6") + + val subject = identify(second, "subject6") + watcher ! WatchIt(subject) + expectMsg(1 second, Ack) + watcher2 ! WatchIt(subject) + expectMsg(1 second, Ack) + subject ! "hello6" + + // testing with this watch/unwatch of watcher2 to make sure that the unwatch doesn't + // remove the first watch + watcher2 ! UnwatchIt(subject) + expectMsg(1 second, Ack) + + enterBarrier("watch-established-6") + + sleep() + + log.info("shutdown second") + testConductor.shutdown(second, 0).await + expectMsgType[WrappedTerminated](15 seconds).t.actor must be(subject) + + // verify that things are cleaned up, and heartbeating is stopped + assertCleanup() + expectNoMsg(2.seconds) + assertCleanup() + } + + runOn(second) { + system.actorOf(Props(classOf[ProbeActor], testActor), "subject6") + enterBarrier("actors-started-6") + + expectMsg(3 seconds, "hello6") + enterBarrier("watch-established-6") + } + + enterBarrier("after-6") + } + + } +} diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 91730759a6..c9550bbd0f 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -57,20 +57,20 @@ akka { ### General settings - # Timeout after which the startup of the remoting subsystem is considered to be failed. - # Increase this value if your transport drivers (see the enabled-transports section) - # need longer time to be loaded. + # Timeout after which the startup of the remoting subsystem is considered + # to be failed. Increase this value if your transport drivers (see the + # enabled-transports section) need longer time to be loaded. startup-timeout = 10 s - # Timout after which the graceful shutdown of the remoting subsystem is considered to be failed. - # After the timeout the remoting system is forcefully shut down. - # Increase this value if your transport drivers (see the enabled-transports section) - # need longer time to stop properly. + # Timout after which the graceful shutdown of the remoting subsystem is + # considered to be failed. After the timeout the remoting system is + # forcefully shut down. Increase this value if your transport drivers + # (see the enabled-transports section) need longer time to stop properly. shutdown-timeout = 10 s - # Before shutting down the drivers, the remoting subsystem attempts to flush all pending - # writes. This setting controls the maximum time the remoting is willing to wait before - # moving on to shut down the drivers. + # Before shutting down the drivers, the remoting subsystem attempts to flush + # all pending writes. This setting controls the maximum time the remoting is + # willing to wait before moving on to shut down the drivers. flush-wait-on-shutdown = 2 s # Reuse inbound connections for outbound messages @@ -132,7 +132,7 @@ akka { # Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf # [Hayashibara et al]) used by the remoting subsystem to detect failed connections. - failure-detector { + transport-failure-detector { # FQCN of the failure detector implementation. # It must implement akka.remote.FailureDetector and have @@ -167,6 +167,59 @@ akka { acceptable-heartbeat-pause = 3 s } + # Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf + # [Hayashibara et al]) used for remote death watch. + watch-failure-detector { + + # FQCN of the failure detector implementation. + # It must implement akka.remote.FailureDetector and have + # a public constructor with a com.typesafe.config.Config parameter. + implementation-class = "akka.remote.PhiAccrualFailureDetector" + + # How often keep-alive heartbeat messages should be sent to each connection. + heartbeat-interval = 1 s + + # Defines the failure detector threshold. + # A low threshold is prone to generate many wrong suspicions but ensures + # a quick detection in the event of a real crash. Conversely, a high + # threshold generates fewer mistakes but needs more time to detect + # actual crashes. + threshold = 10.0 + + # Number of the samples of inter-heartbeat arrival times to adaptively + # calculate the failure timeout for connections. + max-sample-size = 200 + + # Minimum standard deviation to use for the normal distribution in + # AccrualFailureDetector. Too low standard deviation might result in + # too much sensitivity for sudden, but normal, deviations in heartbeat + # inter arrival times. + min-std-deviation = 100 ms + + # Number of potentially lost/delayed heartbeats that will be + # accepted before considering it to be an anomaly. + # This margin is important to be able to survive sudden, occasional, + # pauses in heartbeat arrivals, due to for example garbage collect or + # network drop. + acceptable-heartbeat-pause = 4 s + + + # How often to check for nodes marked as unreachable by the failure + # detector + unreachable-nodes-reaper-interval = 1s + + # After the heartbeat request has been sent the first failure detection + # will start after this period, even though no heartbeat mesage has + # been received. + expected-response-after = 3 s + + # When a node unwatch another node it will end that + # with this number of EndHeartbeatRequest messages, which will stop the + # heartbeating from the other side + nr-of-end-heartbeats = 8 + + } + # After failed to establish an outbound connection, the remoting will mark the # address as failed. This configuration option controls how much time should # be elapsed before reattempting a new connection. While the address is diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 70c9be3744..b0f3d8c3b2 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -14,6 +14,8 @@ import scala.util.control.NonFatal import akka.actor.SystemGuardian.{ TerminationHookDone, TerminationHook, RegisterTerminationHook } import scala.util.control.Exception.Catcher import scala.concurrent.{ ExecutionContext, Future } +import com.typesafe.config.Config +import akka.ConfigurationException /** * INTERNAL API @@ -133,21 +135,22 @@ private[akka] class RemoteActorRefProvider( override def tempPath(): ActorPath = local.tempPath() override def tempContainer: VirtualPathContainer = local.tempContainer - @volatile - private var _internals: Internals = _ + @volatile private var _internals: Internals = _ def transport: RemoteTransport = _internals.transport def serialization: Serialization = _internals.serialization def remoteDaemon: InternalActorRef = _internals.remoteDaemon // This actor ensures the ordering of shutdown between remoteDaemon and the transport - @volatile - private var remotingTerminator: ActorRef = _ + @volatile private var remotingTerminator: ActorRef = _ + + @volatile private var remoteWatcher: ActorRef = _ + @volatile private var remoteDeploymentWatcher: ActorRef = _ def init(system: ActorSystemImpl): Unit = { local.init(system) - remotingTerminator = system.systemActorOf(Props(new RemotingTerminator(local.systemGuardian)), "remoting-terminator") + remotingTerminator = system.systemActorOf(Props(classOf[RemotingTerminator], local.systemGuardian), "remoting-terminator") val internals = Internals( remoteDaemon = { @@ -172,8 +175,37 @@ private[akka] class RemoteActorRefProvider( // this enables reception of remote requests transport.start() + remoteWatcher = createRemoteWatcher(system) + remoteDeploymentWatcher = createRemoteDeploymentWatcher(system) } + protected def createRemoteWatcher(system: ActorSystemImpl): ActorRef = { + import remoteSettings._ + val failureDetector = createRemoteWatcherFailureDetector(system) + system.systemActorOf(RemoteWatcher.props( + failureDetector, + heartbeatInterval = WatchHeartBeatInterval, + unreachableReaperInterval = WatchUnreachableReaperInterval, + heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter, + numberOfEndHeartbeatRequests = WatchNumberOfEndHeartbeatRequests), "remote-watcher") + } + + protected def createRemoteWatcherFailureDetector(system: ExtendedActorSystem): FailureDetectorRegistry[Address] = { + def createFailureDetector(): FailureDetector = { + import remoteSettings.{ WatchFailureDetectorImplementationClass ⇒ fqcn } + system.dynamicAccess.createInstanceFor[FailureDetector]( + fqcn, List(classOf[Config] -> remoteSettings.WatchFailureDetectorConfig)).recover({ + case e ⇒ throw new ConfigurationException( + s"Could not create custom remote watcher failure detector [$fqcn] due to: ${e.toString}", e) + }).get + } + + new DefaultFailureDetectorRegistry(() ⇒ createFailureDetector()) + } + + protected def createRemoteDeploymentWatcher(system: ActorSystemImpl): ActorRef = + system.systemActorOf(Props[RemoteDeploymentWatcher], "remote-deployment-watcher") + def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean, async: Boolean): InternalActorRef = { if (systemService) local.actorOf(system, props, supervisor, path, systemService, deploy, lookupDeploy, async) @@ -352,6 +384,8 @@ private[akka] class RemoteActorRefProvider( // before someone can send messages to it resolveActorRef(RootActorPath(ref.path.address) / "remote") ! DaemonMsgCreate(props, deploy, ref.path.toSerializationFormat, supervisor) + + remoteDeploymentWatcher ! RemoteDeploymentWatcher.WatchRemote(ref, supervisor) } def getExternalAddressFor(addr: Address): Option[Address] = { @@ -367,6 +401,22 @@ private[akka] class RemoteActorRefProvider( private def hasAddress(address: Address): Boolean = address == local.rootPath.address || address == rootPath.address || transport.addresses(address) + def quarantine(address: Address, uid: Int): Unit = { + // FIXME send to EndpointManager + } + + /** + * INTERNAL API + */ + private[akka] def afterSendSystemMessage(message: SystemMessage): Unit = + message match { + // Sending to local remoteWatcher relies strong delivery guarantees of local send, i.e. + // default dispatcher must not be changed to an implementation that defeats that + case Watch(watchee, watcher) ⇒ remoteWatcher ! RemoteWatcher.WatchRemote(watchee, watcher) + case Unwatch(watchee, watcher) ⇒ remoteWatcher ! RemoteWatcher.UnwatchRemote(watchee, watcher) + case _ ⇒ + } + } private[akka] trait RemoteRef extends ActorRefScope { @@ -406,7 +456,11 @@ private[akka] class RemoteActorRef private[akka] ( remote.system.eventStream.publish(Error(e, path.toString, getClass, "swallowing exception during message send")) } - def sendSystemMessage(message: SystemMessage): Unit = try remote.send(message, None, this) catch handleException + def sendSystemMessage(message: SystemMessage): Unit = + try { + remote.send(message, None, this) + provider.afterSendSystemMessage(message) + } catch handleException override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = { if (message == null) throw new InvalidMessageException("Message is null") diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeploymentWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeploymentWatcher.scala new file mode 100644 index 0000000000..8276c2bda0 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeploymentWatcher.scala @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.remote + +import akka.actor.InternalActorRef +import akka.actor.Terminated +import akka.actor.Actor +import akka.actor.ActorRef +import akka.dispatch.sysmsg.DeathWatchNotification + +/** + * INTERNAL API + */ +private[akka] object RemoteDeploymentWatcher { + case class WatchRemote(actor: ActorRef, supervisor: ActorRef) +} + +/** + * INTERNAL API + * + * Responsible for cleaning up child references of remote deployed actors when remote node + * goes down (jvm crash, network failure), i.e. triggered by [[akka.actor.AddressTerminated]]. + */ +private[akka] class RemoteDeploymentWatcher extends Actor { + import RemoteDeploymentWatcher._ + var supervisors = Map.empty[ActorRef, InternalActorRef] + + def receive = { + case WatchRemote(a, supervisor: InternalActorRef) ⇒ + supervisors += (a -> supervisor) + context.watch(a) + + case t @ Terminated(a) if supervisors isDefinedAt a ⇒ + // send extra DeathWatchNotification to the supervisor so that it will remove the child + supervisors(a).sendSystemMessage(DeathWatchNotification(a, existenceConfirmed = false, addressTerminated = true)) + supervisors -= a + + case _: Terminated ⇒ + } +} \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 2a2a64e4f6..ab2556a5fa 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -8,6 +8,7 @@ import scala.concurrent.duration._ import java.util.concurrent.TimeUnit.MILLISECONDS import akka.util.Timeout import scala.collection.immutable +import akka.util.Helpers.Requiring import akka.japi.Util._ class RemoteSettings(val config: Config) { @@ -44,6 +45,21 @@ class RemoteSettings(val config: Config) { val CommandAckTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.remote.command-ack-timeout"), MILLISECONDS)) + val WatchFailureDetectorConfig: Config = getConfig("akka.remote.watch-failure-detector") + val WatchFailureDetectorImplementationClass: String = WatchFailureDetectorConfig.getString("implementation-class") + val WatchHeartBeatInterval: FiniteDuration = { + Duration(WatchFailureDetectorConfig.getMilliseconds("heartbeat-interval"), MILLISECONDS) + } requiring (_ > Duration.Zero, "watch-failure-detector.heartbeat-interval must be > 0") + val WatchUnreachableReaperInterval: FiniteDuration = { + Duration(WatchFailureDetectorConfig.getMilliseconds("unreachable-nodes-reaper-interval"), MILLISECONDS) + } requiring (_ > Duration.Zero, "watch-failure-detector.unreachable-nodes-reaper-interval must be > 0") + val WatchNumberOfEndHeartbeatRequests: Int = { + WatchFailureDetectorConfig.getInt("nr-of-end-heartbeats") + } requiring (_ > 0, "watch-failure-detector.nr-of-end-heartbeats must be > 0") + val WatchHeartbeatExpectedResponseAfter: FiniteDuration = { + Duration(WatchFailureDetectorConfig.getMilliseconds("expected-response-after"), MILLISECONDS) + } requiring (_ > Duration.Zero, "watch-failure-detector.expected-response-after > 0") + val Transports: immutable.Seq[(String, immutable.Seq[String], Config)] = transportNames.map { name ⇒ val transportConfig = transportConfigFor(name) (transportConfig.getString("transport-class"), diff --git a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala new file mode 100644 index 0000000000..2be4989770 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala @@ -0,0 +1,311 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.remote + +import scala.concurrent.duration._ +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.Address +import akka.actor.AddressTerminated +import akka.actor.Props +import akka.actor.RootActorPath +import akka.actor.Terminated +import akka.actor.ExtendedActorSystem +import akka.ConfigurationException + +/** + * INTERNAL API + */ +private[akka] object RemoteWatcher { + + /** + * Factory method for `RemoteWatcher` [[akka.actor.Props]]. + */ + def props( + failureDetector: FailureDetectorRegistry[Address], + heartbeatInterval: FiniteDuration, + unreachableReaperInterval: FiniteDuration, + heartbeatExpectedResponseAfter: FiniteDuration, + numberOfEndHeartbeatRequests: Int): Props = + Props(classOf[RemoteWatcher], failureDetector, heartbeatInterval, unreachableReaperInterval, + heartbeatExpectedResponseAfter, numberOfEndHeartbeatRequests) + + case class WatchRemote(watchee: ActorRef, watcher: ActorRef) + case class UnwatchRemote(watchee: ActorRef, watcher: ActorRef) + + @SerialVersionUID(1L) case object HeartbeatRequest + @SerialVersionUID(1L) case object EndHeartbeatRequest + @SerialVersionUID(1L) case class Heartbeat(addressUid: Int) + + // sent to self only + case object HeartbeatTick + case object ReapUnreachableTick + case class ExpectedFirstHeartbeat(from: Address) + + // test purpose + object Stats { + lazy val empty: Stats = counts(0, 0, 0) + def counts(watching: Int, watchingNodes: Int, watchedByNodes: Int): Stats = + new Stats(watching, watchingNodes, watchedByNodes)(Set.empty) + } + case class Stats(watching: Int, watchingNodes: Int, watchedByNodes: Int)(val watchingRefs: Set[(ActorRef, ActorRef)]) { + override def toString: String = { + def formatWatchingRefs: String = + if (watchingRefs.isEmpty) "" + else ", watchingRefs=" + watchingRefs.map(x ⇒ x._2.path.name + " -> " + x._1.path.name).mkString("[", ", ", "]") + + s"Stats(watching=${watching}, watchingNodes=${watchingNodes}, watchedByNodes=${watchedByNodes}${formatWatchingRefs})" + } + } +} + +/** + * INTERNAL API + * + * Remote nodes with actors that are watched are monitored by this actor to be able + * to detect network failures and JVM crashes. [[akka.remote.RemoteActorRefProvider]] + * intercepts Watch and Unwatch system messages and sends corresponding + * [[RemoteWatcher.WatchRemote]] and [[RemoteWatcher.UnwatchRemote]] to this actor. + * + * For a new node to be watched this actor starts the monitoring by sending [[RemoteWatcher.HeartbeatRequest]] + * to the peer actor on the other node, which then sends periodic [[RemoteWatcher.Heartbeat]] + * messages back. The failure detector on the watching side monitors these heartbeat messages. + * If arrival of hearbeat messages stops it will be detected and this actor will publish + * [[akka.actor.AddressTerminated]] to the `eventStream`. + * + * When all actors on a node have been unwatched, or terminated, this actor sends + * [[RemoteWatcher.EndHeartbeatRequest]] messages to the peer actor on the other node, + * which will then stop sending heartbeat messages. + * + * The actor sending heartbeat messages will also watch the peer on the other node, + * to be able to stop sending heartbeat messages in case of network failure or JVM crash. + * + * For bi-directional watch between two nodes the same thing will be established in + * both directions, but independent of each other. + * + */ +private[akka] class RemoteWatcher( + failureDetector: FailureDetectorRegistry[Address], + heartbeatInterval: FiniteDuration, + unreachableReaperInterval: FiniteDuration, + heartbeatExpectedResponseAfter: FiniteDuration, + numberOfEndHeartbeatRequests: Int) + extends Actor with ActorLogging { + + import RemoteWatcher._ + import context.dispatcher + def scheduler = context.system.scheduler + + val remoteProvider: RemoteActorRefProvider = context.system.asInstanceOf[ExtendedActorSystem].provider match { + case rarp: RemoteActorRefProvider ⇒ rarp + case other ⇒ throw new ConfigurationException( + s"ActorSystem [${context.system}] needs to have a 'RemoteActorRefProvider' enabled in the configuration, currently uses [${other.getClass.getName}]") + } + + val selfHeartbeatMsg = Heartbeat(AddressUidExtension(context.system).addressUid) + + // actors that this node is watching, tuple with (watcher, watchee) + var watching: Set[(ActorRef, ActorRef)] = Set.empty + // nodes that this node is watching, i.e. expecting hearteats from these nodes + var watchingNodes: Set[Address] = Set.empty + // heartbeats will be sent to watchedByNodes, ref is RemoteWatcher at other side + var watchedByNodes: Set[ActorRef] = Set.empty + var unreachable: Set[Address] = Set.empty + var endWatchingNodes: Map[Address, Int] = Map.empty + var addressUids: Map[Address, Int] = Map.empty + + val heartbeatTask = scheduler.schedule(heartbeatInterval, heartbeatInterval, self, HeartbeatTick) + val failureDetectorReaperTask = scheduler.schedule(unreachableReaperInterval, unreachableReaperInterval, + self, ReapUnreachableTick) + + override def postStop(): Unit = { + super.postStop() + heartbeatTask.cancel() + failureDetectorReaperTask.cancel() + } + + def receive = { + case HeartbeatTick ⇒ + sendHeartbeat() + sendHeartbeatRequest() + sendEndHeartbeatRequest() + case Heartbeat(uid) ⇒ heartbeat(uid) + case ReapUnreachableTick ⇒ reapUnreachable() + case HeartbeatRequest ⇒ heartbeatRequest() + case EndHeartbeatRequest ⇒ endHeartbeatRequest() + case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from) + case WatchRemote(watchee, watcher) ⇒ watchRemote(watchee, watcher) + case UnwatchRemote(watchee, watcher) ⇒ unwatchRemote(watchee, watcher) + case Terminated(watchee) ⇒ terminated(watchee) + + // test purpose + case Stats ⇒ + sender ! Stats( + watching = watching.size, + watchingNodes = watchingNodes.size, + watchedByNodes = watchedByNodes.size)(watching) + } + + def heartbeat(uid: Int): Unit = { + val from = sender.path.address + + if (failureDetector.isMonitoring(from)) + log.debug("Received heartbeat from [{}]", from) + else + log.debug("Received first heartbeat from [{}]", from) + + if (watchingNodes(from) && !unreachable(from)) { + addressUids += (from -> uid) + failureDetector.heartbeat(from) + } + } + + def heartbeatRequest(): Unit = { + // request to start sending heartbeats to the node + log.debug("Received HeartbeatRequest from [{}]", sender.path.address) + watchedByNodes += sender + // watch back to stop heartbeating if other side dies + context watch sender + watching += ((sender, self)) + } + + def endHeartbeatRequest(): Unit = { + // request to stop sending heartbeats to the node + log.debug("Received EndHeartbeatRequest from [{}]", sender.path.address) + watchedByNodes -= sender + context unwatch sender + watching -= ((sender, self)) + checkLastUnwatchOfNode(sender.path.address) + } + + def reapUnreachable(): Unit = + watchingNodes foreach { a ⇒ + if (!unreachable(a) && !failureDetector.isAvailable(a)) { + log.warning("Detected unreachable: [{}]", a) + publishAddressTerminated(a) + unreachable += a + } + } + + def publishAddressTerminated(address: Address): Unit = { + addressUids.get(address) foreach { uid ⇒ remoteProvider.quarantine(address, uid) } + context.system.eventStream.publish(AddressTerminated(address)) + } + + def watchRemote(watchee: ActorRef, watcher: ActorRef): Unit = + if (watchee.path.uid == akka.actor.ActorCell.undefinedUid) + logActorForDeprecationWarning(watchee) + else if (watcher != self) { + log.debug("Watching: [{} -> {}]", watcher.path, watchee.path) + watching += ((watchee, watcher)) + val watcheeAddress = watchee.path.address + if (!watchingNodes(watcheeAddress) && unreachable(watcheeAddress)) { + // first watch to that node after a previous unreachable + unreachable -= watcheeAddress + failureDetector.remove(watcheeAddress) + } + watchingNodes += watcheeAddress + endWatchingNodes -= watcheeAddress + + // also watch from self, to be able to cleanup on termination of the watchee + context watch watchee + watching += ((watchee, self)) + } + + def unwatchRemote(watchee: ActorRef, watcher: ActorRef): Unit = + if (watchee.path.uid == akka.actor.ActorCell.undefinedUid) + logActorForDeprecationWarning(watchee) + else if (watcher != self) { + log.debug("Unwatching: [{} -> {}]", watcher.path, watchee.path) + watching -= ((watchee, watcher)) + + // clean up self watch when no more watchers of this watchee + if (watching.forall { case (wee, wer) ⇒ wee != watchee || wer == self }) { + log.debug("Cleanup self watch of [{}]", watchee.path) + context unwatch watchee + watching -= ((watchee, self)) + } + checkLastUnwatchOfNode(watchee.path.address) + } + + def logActorForDeprecationWarning(watchee: ActorRef): Unit = { + log.debug("actorFor is deprecated, and watching a remote ActorRef acquired with actorFor is not reliable: [{}]", watchee.path) + } + + def terminated(watchee: ActorRef): Unit = { + if (matchingPathElements(self, watchee)) { + log.debug("Other side terminated: [{}]", watchee.path) + // stop heartbeating to that node immediately, and cleanup + watchedByNodes -= watchee + watching -= ((watchee, self)) + } else { + log.debug("Watchee terminated: [{}]", watchee.path) + watching = watching.filterNot { + case (wee, _) ⇒ wee == watchee + } + checkLastUnwatchOfNode(watchee.path.address) + } + } + + def checkLastUnwatchOfNode(watcheeAddress: Address): Unit = { + if (watchingNodes(watcheeAddress) && watching.forall { + case (wee, wer) ⇒ wee.path.address != watcheeAddress || (wer == self && matchingPathElements(self, wee)) + }) { + // unwatched last watchee on that node, not counting RemoteWatcher peer + log.debug("Unwatched last watchee of node: [{}]", watcheeAddress) + watchingNodes -= watcheeAddress + addressUids -= watcheeAddress + // continue by sending EndHeartbeatRequest for a while + endWatchingNodes += (watcheeAddress -> 0) + failureDetector.remove(watcheeAddress) + } + } + + def matchingPathElements(a: ActorRef, b: ActorRef): Boolean = + a.path.elements == b.path.elements + + def sendHeartbeat(): Unit = + watchedByNodes foreach { ref ⇒ + val a = ref.path.address + if (!unreachable(a)) { + log.debug("Sending Heartbeat to [{}]", ref.path.address) + ref ! selfHeartbeatMsg + } + } + + def sendHeartbeatRequest(): Unit = + watchingNodes.foreach { a ⇒ + if (!unreachable(a) && !failureDetector.isMonitoring(a)) { + log.debug("Sending HeartbeatRequest to [{}]", a) + context.actorSelection(RootActorPath(a) / self.path.elements) ! HeartbeatRequest + // schedule the expected heartbeat for later, which will give the + // other side a chance to start heartbeating, and also trigger some resends of + // the heartbeat request + scheduler.scheduleOnce(heartbeatExpectedResponseAfter, self, ExpectedFirstHeartbeat(a)) + endWatchingNodes -= a + } + } + + def sendEndHeartbeatRequest(): Unit = + endWatchingNodes.foreach { + case (a, count) ⇒ + if (!unreachable(a)) { + log.debug("Sending EndHeartbeatRequest to [{}]", a) + context.actorSelection(RootActorPath(a) / self.path.elements) ! EndHeartbeatRequest + } + if (count == numberOfEndHeartbeatRequests - 1) { + endWatchingNodes -= a + } else { + endWatchingNodes += (a -> (count + 1)) + } + } + + def triggerFirstHeartbeat(address: Address): Unit = + if (watchingNodes(address) && !failureDetector.isMonitoring(address)) { + log.debug("Trigger extra expected heartbeat from [{}]", address) + failureDetector.heartbeat(address) + } + +} \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index b55a55d9c2..835fb3cbdf 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -15,6 +15,7 @@ import akka.remote.transport.AssociationHandle._ import akka.remote.transport.ProtocolStateActor._ import akka.remote.transport.Transport._ import akka.util.ByteString +import akka.util.Helpers.Requiring import akka.{ OnlyCauseStackTrace, AkkaException } import com.typesafe.config.Config import scala.collection.immutable @@ -31,15 +32,11 @@ private[remote] class AkkaProtocolSettings(config: Config) { import config._ - val FailureDetectorConfig: Config = getConfig("akka.remote.failure-detector") - - val FailureDetectorImplementationClass: String = FailureDetectorConfig.getString("implementation-class") - - val AcceptableHeartBeatPause: FiniteDuration = - Duration(FailureDetectorConfig.getMilliseconds("acceptable-heartbeat-pause"), MILLISECONDS) - - val HeartBeatInterval: FiniteDuration = - Duration(FailureDetectorConfig.getMilliseconds("heartbeat-interval"), MILLISECONDS) + val TransportFailureDetectorConfig: Config = getConfig("akka.remote.transport-failure-detector") + val TransportFailureDetectorImplementationClass: String = TransportFailureDetectorConfig.getString("implementation-class") + val TransportHeartBeatInterval: FiniteDuration = { + Duration(TransportFailureDetectorConfig.getMilliseconds("heartbeat-interval"), MILLISECONDS) + } requiring (_ > Duration.Zero, "transport-failure-detector.heartbeat-interval must be > 0") val WaitActivityEnabled: Boolean = getBoolean("akka.remote.wait-activity-enabled") @@ -117,7 +114,7 @@ private[transport] class AkkaProtocolManager( val stateActorLocalAddress = localAddress val stateActorAssociationHandler = associationListener val stateActorSettings = settings - val failureDetector = createFailureDetector() + val failureDetector = createTransportFailureDetector() context.actorOf(Props(new ProtocolStateActor( stateActorLocalAddress, handle, @@ -130,7 +127,7 @@ private[transport] class AkkaProtocolManager( val stateActorLocalAddress = localAddress val stateActorSettings = settings val stateActorWrappedTransport = wrappedTransport - val failureDetector = createFailureDetector() + val failureDetector = createTransportFailureDetector() context.actorOf(Props(new ProtocolStateActor( stateActorLocalAddress, remoteAddress, @@ -141,10 +138,10 @@ private[transport] class AkkaProtocolManager( failureDetector)), actorNameFor(remoteAddress)) // Why don't we watch this one? } - private def createFailureDetector(): FailureDetector = { - import settings.{ FailureDetectorImplementationClass ⇒ fqcn } + private def createTransportFailureDetector(): FailureDetector = { + import settings.{ TransportFailureDetectorImplementationClass ⇒ fqcn } context.system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[FailureDetector]( - fqcn, List(classOf[Config] -> settings.FailureDetectorConfig)).recover({ + fqcn, List(classOf[Config] -> settings.TransportFailureDetectorConfig)).recover({ case e ⇒ throw new ConfigurationException( s"Could not create custom remote failure detector [$fqcn] due to: ${e.toString}", e) }).get @@ -397,7 +394,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat } private def initTimers(): Unit = { - setTimer("heartbeat-timer", HeartbeatTimer, settings.HeartBeatInterval, repeat = true) + setTimer("heartbeat-timer", HeartbeatTimer, settings.TransportHeartBeatInterval, repeat = true) } private def handleTimers(wrappedHandle: AssociationHandle): State = { diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 7458b25b8b..a72d792083 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -46,6 +46,16 @@ class RemoteConfigSpec extends AkkaSpec( "gremlin" -> classOf[akka.remote.transport.FailureInjectorProvider].getName, "trttl" -> classOf[akka.remote.transport.ThrottlerProvider].getName)) + WatchFailureDetectorImplementationClass must be(classOf[PhiAccrualFailureDetector].getName) + WatchHeartBeatInterval must be(1 seconds) + WatchNumberOfEndHeartbeatRequests must be(8) + WatchHeartbeatExpectedResponseAfter must be(3 seconds) + WatchUnreachableReaperInterval must be(1 second) + WatchFailureDetectorConfig.getDouble("threshold") must be(10.0 plusOrMinus 0.0001) + WatchFailureDetectorConfig.getInt("max-sample-size") must be(200) + Duration(WatchFailureDetectorConfig.getMilliseconds("acceptable-heartbeat-pause"), MILLISECONDS) must be(4 seconds) + Duration(WatchFailureDetectorConfig.getMilliseconds("min-std-deviation"), MILLISECONDS) must be(100 millis) + } "be able to parse AkkaProtocol related config elements" in { @@ -53,15 +63,15 @@ class RemoteConfigSpec extends AkkaSpec( import settings._ WaitActivityEnabled must be(true) - FailureDetectorImplementationClass must be(classOf[PhiAccrualFailureDetector].getName) - AcceptableHeartBeatPause must be === 3.seconds - HeartBeatInterval must be === 1.seconds RequireCookie must be(false) SecureCookie must be === "" - FailureDetectorConfig.getDouble("threshold") must be(7.0 plusOrMinus 0.0001) - FailureDetectorConfig.getInt("max-sample-size") must be(100) - Duration(FailureDetectorConfig.getMilliseconds("min-std-deviation"), MILLISECONDS) must be(100 millis) + TransportFailureDetectorImplementationClass must be(classOf[PhiAccrualFailureDetector].getName) + TransportHeartBeatInterval must be === 1.seconds + TransportFailureDetectorConfig.getDouble("threshold") must be(7.0 plusOrMinus 0.0001) + TransportFailureDetectorConfig.getInt("max-sample-size") must be(100) + Duration(TransportFailureDetectorConfig.getMilliseconds("acceptable-heartbeat-pause"), MILLISECONDS) must be(3 seconds) + Duration(TransportFailureDetectorConfig.getMilliseconds("min-std-deviation"), MILLISECONDS) must be(100 millis) } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala new file mode 100644 index 0000000000..c74f436185 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala @@ -0,0 +1,352 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.remote + +import language.postfixOps +import scala.concurrent.duration._ +import akka.testkit._ +import akka.actor.ActorSystem +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.actor.ExtendedActorSystem +import akka.actor.RootActorPath +import akka.actor.Identify +import akka.actor.ActorIdentity +import akka.actor.PoisonPill +import akka.actor.AddressTerminated +import akka.actor.MinimalActorRef +import akka.actor.Address + +object RemoteWatcherSpec { + + class TestActorProxy(testActor: ActorRef) extends Actor { + def receive = { + case msg ⇒ testActor forward msg + } + } + + class MyActor extends Actor { + def receive = Actor.emptyBehavior + } + + case class WrappedAddressTerminated(msg: AddressTerminated) + +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class RemoteWatcherSpec extends AkkaSpec( + """akka { + # loglevel = DEBUG + actor.provider = "akka.remote.RemoteActorRefProvider" + remote.netty.tcp { + hostname = localhost + port = 0 + } + }""") with ImplicitSender { + + import RemoteWatcherSpec._ + import RemoteWatcher._ + + override def expectedTestDuration = 2.minutes + + val remoteSystem = ActorSystem("RemoteSystem", system.settings.config) + val remoteAddress = remoteSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + + override def afterTermination() { + remoteSystem.shutdown() + } + + val heartbeatMsgB = Heartbeat(AddressUidExtension(remoteSystem).addressUid) + + def createRemoteActor(props: Props, name: String): ActorRef = { + remoteSystem.actorOf(props, name) + system.actorSelection(RootActorPath(remoteAddress) / "user" / name) ! Identify(name) + expectMsgType[ActorIdentity].ref.get + } + + // turn off all periodic activity + val TurnOff = 5.minutes + + def nonScheduledRemoteWatcherProps(): Props = { + val fd = createFailureDetector() + RemoteWatcher.props( + fd, + heartbeatInterval = TurnOff, + unreachableReaperInterval = TurnOff, + heartbeatExpectedResponseAfter = TurnOff, + numberOfEndHeartbeatRequests = 3) + } + + def createFailureDetector(): FailureDetectorRegistry[Address] = { + def createFailureDetector(): FailureDetector = + new PhiAccrualFailureDetector( + threshold = 8.0, + maxSampleSize = 200, + minStdDeviation = 100.millis, + acceptableHeartbeatPause = 3.seconds, + firstHeartbeatEstimate = 1.second) + + new DefaultFailureDetectorRegistry(() ⇒ createFailureDetector()) + } + + // AddressTerminated is AutoReceiveMessage + def addressTerminatedSubscriber(fwTo: ActorRef) = new MinimalActorRef { + override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = message match { + case msg: AddressTerminated ⇒ fwTo.tell(WrappedAddressTerminated(msg), sender) + } + override val path = system / "testSubscriber" / fwTo.path.name + override def provider = throw new UnsupportedOperationException("UndefinedUidActorRef does not provide") + } + + "A RemoteWatcher" must { + + "have correct interaction when watching" in { + + val fd = createFailureDetector() + val monitorA = system.actorOf(nonScheduledRemoteWatcherProps(), "monitor1") + val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor1") + + val a1 = system.actorOf(Props[MyActor], "a1") + val a2 = system.actorOf(Props[MyActor], "a2") + val b1 = createRemoteActor(Props[MyActor], "b1") + val b2 = createRemoteActor(Props[MyActor], "b2") + + monitorA ! WatchRemote(b1, a1) + monitorA ! WatchRemote(b2, a1) + monitorA ! WatchRemote(b2, a2) + monitorA ! Stats + // for each watchee the RemoteWatcher also adds its own watch: 5 = 3 + 2 + // (a1->b1), (a1->b2), (a2->b2) + expectMsg(Stats.counts(watching = 5, watchingNodes = 1, watchedByNodes = 0)) + expectNoMsg(100 millis) + monitorA ! HeartbeatTick + expectMsg(HeartbeatRequest) + expectNoMsg(100 millis) + monitorA ! HeartbeatTick + expectMsg(HeartbeatRequest) + expectNoMsg(100 millis) + monitorA.tell(heartbeatMsgB, monitorB) + monitorA ! HeartbeatTick + expectNoMsg(100 millis) + + monitorA ! UnwatchRemote(b1, a1) + // still (a1->b2) and (a2->b2) left + monitorA ! Stats + expectMsg(Stats.counts(watching = 3, watchingNodes = 1, watchedByNodes = 0)) + expectNoMsg(100 millis) + monitorA ! HeartbeatTick + expectNoMsg(100 millis) + + monitorA ! UnwatchRemote(b2, a2) + // still (a1->b2) left + monitorA ! Stats + expectMsg(Stats.counts(watching = 2, watchingNodes = 1, watchedByNodes = 0)) + expectNoMsg(100 millis) + monitorA ! HeartbeatTick + expectNoMsg(100 millis) + + monitorA ! UnwatchRemote(b2, a1) + // all unwatched + monitorA ! Stats + expectMsg(Stats.empty) + expectNoMsg(100 millis) + // expecting 3 EndHeartbeatRequest + monitorA ! HeartbeatTick + expectMsg(EndHeartbeatRequest) + expectNoMsg(100 millis) + monitorA ! HeartbeatTick + expectMsg(EndHeartbeatRequest) + expectNoMsg(100 millis) + monitorA ! HeartbeatTick + expectMsg(EndHeartbeatRequest) + expectNoMsg(100 millis) + monitorA ! HeartbeatTick + expectNoMsg(100 millis) + + // make sure nothing floods over to next test + expectNoMsg(2 seconds) + } + + "have correct interaction when beeing watched" in { + + val monitorA = system.actorOf(Props(classOf[TestActorProxy], testActor), "monitor2") + val monitorB = createRemoteActor(nonScheduledRemoteWatcherProps(), "monitor2") + + val b3 = createRemoteActor(Props[MyActor], "b3") + + // watch + monitorB.tell(HeartbeatRequest, monitorA) + monitorB ! Stats + // HeartbeatRequest adds cross watch to RemoteWatcher peer + expectMsg(Stats.counts(watching = 1, watchingNodes = 0, watchedByNodes = 1)) + expectNoMsg(100 millis) + monitorB ! HeartbeatTick + expectMsg(heartbeatMsgB) + expectNoMsg(100 millis) + monitorB ! HeartbeatTick + expectMsg(heartbeatMsgB) + expectNoMsg(100 millis) + + // unwatch + monitorB.tell(EndHeartbeatRequest, monitorA) + monitorB ! Stats + // EndHeartbeatRequest should remove the cross watch + expectMsg(Stats.empty) + expectNoMsg(100 millis) + monitorB ! HeartbeatTick + expectNoMsg(100 millis) + + // start heartbeating again + monitorB.tell(HeartbeatRequest, monitorA) + monitorB ! Stats + expectMsg(Stats.counts(watching = 1, watchingNodes = 0, watchedByNodes = 1)) + expectNoMsg(100 millis) + monitorB ! HeartbeatTick + expectMsg(heartbeatMsgB) + expectNoMsg(100 millis) + + // then kill other side, which should stop the heartbeating + monitorA ! PoisonPill + awaitAssert { + monitorB ! Stats + expectMsg(Stats.empty) + } + monitorB ! HeartbeatTick + expectNoMsg(500 millis) + + // make sure nothing floods over to next test + expectNoMsg(2 seconds) + } + + "generate AddressTerminated when missing heartbeats" in { + val p = TestProbe() + system.eventStream.subscribe(addressTerminatedSubscriber(p.ref), classOf[AddressTerminated]) + + val monitorA = system.actorOf(nonScheduledRemoteWatcherProps(), "monitor4") + val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor4") + + val a = system.actorOf(Props[MyActor], "a4") + val b = createRemoteActor(Props[MyActor], "b4") + + monitorA ! WatchRemote(b, a) + + monitorA ! HeartbeatTick + expectMsg(HeartbeatRequest) + monitorA.tell(heartbeatMsgB, monitorB) + expectNoMsg(1 second) + monitorA.tell(heartbeatMsgB, monitorB) + + within(10 seconds) { + awaitAssert { + monitorA ! HeartbeatTick + monitorA ! ReapUnreachableTick + p.expectMsg(1 second, WrappedAddressTerminated(AddressTerminated(b.path.address))) + } + } + + // make sure nothing floods over to next test + expectNoMsg(2 seconds) + } + + "generate AddressTerminated when missing first heartbeat" in { + val p = TestProbe() + system.eventStream.subscribe(addressTerminatedSubscriber(p.ref), classOf[AddressTerminated]) + + val fd = createFailureDetector() + val monitorA = system.actorOf(RemoteWatcher.props( + fd, + heartbeatInterval = TurnOff, + unreachableReaperInterval = TurnOff, + heartbeatExpectedResponseAfter = 2.seconds, + numberOfEndHeartbeatRequests = 3), "monitor5") + val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor5") + + val a = system.actorOf(Props[MyActor], "a5") + val b = createRemoteActor(Props[MyActor], "b5") + + monitorA ! WatchRemote(b, a) + + monitorA ! HeartbeatTick + expectMsg(HeartbeatRequest) + // no heartbeats sent + + within(20 seconds) { + awaitAssert { + monitorA ! HeartbeatTick + monitorA ! ReapUnreachableTick + p.expectMsg(1 second, WrappedAddressTerminated(AddressTerminated(b.path.address))) + } + } + + // some more HeartbeatRequest may be sent + receiveWhile(1.second) { + case HeartbeatRequest ⇒ // ok + } + + // make sure nothing floods over to next test + expectNoMsg(2 seconds) + + } + + "generate AddressTerminated for new watch after broken connection that was re-established and broken again" in { + val p = TestProbe() + system.eventStream.subscribe(addressTerminatedSubscriber(p.ref), classOf[AddressTerminated]) + + val monitorA = system.actorOf(nonScheduledRemoteWatcherProps(), "monitor6") + val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor6") + + val a = system.actorOf(Props[MyActor], "a6") + val b = createRemoteActor(Props[MyActor], "b6") + + monitorA ! WatchRemote(b, a) + + monitorA ! HeartbeatTick + expectMsg(HeartbeatRequest) + monitorA.tell(heartbeatMsgB, monitorB) + expectNoMsg(1 second) + monitorA.tell(heartbeatMsgB, monitorB) + + within(10 seconds) { + awaitAssert { + monitorA ! HeartbeatTick + monitorA ! ReapUnreachableTick + p.expectMsg(1 second, WrappedAddressTerminated(AddressTerminated(b.path.address))) + } + } + + // assume that connection comes up again, or remote system is restarted + val c = createRemoteActor(Props[MyActor], "c6") + + monitorA ! WatchRemote(b, a) + + monitorA ! HeartbeatTick + expectMsg(HeartbeatRequest) + monitorA.tell(heartbeatMsgB, monitorB) + expectNoMsg(1 second) + monitorA.tell(heartbeatMsgB, monitorB) + monitorA ! HeartbeatTick + monitorA ! ReapUnreachableTick + p.expectNoMsg(1 second) + monitorA.tell(heartbeatMsgB, monitorB) + monitorA ! HeartbeatTick + monitorA ! ReapUnreachableTick + p.expectNoMsg(1 second) + + // then stop heartbeating again, should generate new AddressTerminated + within(10 seconds) { + awaitAssert { + monitorA ! HeartbeatTick + monitorA ! ReapUnreachableTick + p.expectMsg(1 second, WrappedAddressTerminated(AddressTerminated(b.path.address))) + } + } + + // make sure nothing floods over to next test + expectNoMsg(2 seconds) + } + + } + +} diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala index aa9daf6e94..3f4a9cd08f 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala @@ -35,7 +35,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re """ akka.remote { - failure-detector { + transport-failure-detector { implementation-class = "akka.remote.PhiAccrualFailureDetector" threshold = 7.0 max-sample-size = 100 diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala index 19163c3e6e..699cc23a67 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala @@ -19,7 +19,7 @@ object AkkaProtocolStressTest { remote.retry-gate-closed-for = 0 s remote.log-remote-lifecycle-events = on - remote.failure-detector { + remote.transport-failure-detector { threshold = 1.0 max-sample-size = 2 min-std-deviation = 1 ms