diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index d02f37cf7b..8f6f5d70f1 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -82,7 +82,8 @@ case class Terminated private[akka] (@BeanProperty actor: ActorRef)( * INTERNAL API * * Used for remote death watch. Failure detector publish this to the - * `eventStream` when a remote node is detected to be unreachable. + * `eventStream` when a remote node is detected to be unreachable and/or decided to + * be removed. * The watcher ([[akka.actor.DeathWatch]]) subscribes to the `eventStream` * and translates this event to [[akka.actor.Terminated]], which is sent itself. */ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index f82f5e8835..bb90764837 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -296,9 +296,16 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto } publish(event) - case MemberUnreachable(m) ⇒ + case MemberDowned(m) ⇒ + // TODO this case might be collapsed with MemberRemoved, see ticket #2788 + // but right now we don't change Downed to Removed publish(event) - // notify DeathWatch about unreachable node + // notify DeathWatch about downed node + publish(AddressTerminated(m.address)) + + case MemberRemoved(m) ⇒ + publish(event) + // notify DeathWatch about removed node publish(AddressTerminated(m.address)) case _ ⇒ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala index 3c1b41c950..d711aec55f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala @@ -3,6 +3,8 @@ */ package akka.cluster +import language.postfixOps +import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import org.scalatest.BeforeAndAfter import akka.remote.testkit.MultiNodeConfig @@ -54,7 +56,7 @@ abstract class ClusterDeathWatchSpec } "An actor watching a remote actor in the cluster" must { - "receive Terminated when watched node becomes unreachable" taggedAs LongRunningTest in { + "receive Terminated when watched node becomes Down" taggedAs LongRunningTest in within(20 seconds) { awaitClusterUp(roles: _*) enterBarrier("cluster-up") @@ -76,10 +78,12 @@ abstract class ClusterDeathWatchSpec watchEstablished.await enterBarrier("watch-established") expectMsg(path2) - expectNoMsg + expectNoMsg(2 seconds) enterBarrier("second-terminated") markNodeAsUnavailable(third) + awaitCond(clusterView.unreachableMembers.exists(_.address == address(third))) + cluster.down(third) expectMsg(path3) enterBarrier("third-terminated") @@ -91,6 +95,8 @@ abstract class ClusterDeathWatchSpec enterBarrier("watch-established") runOn(third) { markNodeAsUnavailable(second) + awaitCond(clusterView.unreachableMembers.exists(_.address == address(second))) + cluster.down(second) } enterBarrier("second-terminated") enterBarrier("third-terminated") @@ -132,7 +138,7 @@ abstract class ClusterDeathWatchSpec enterBarrier("after-3") } - "be able to shutdown system when using remote deployed actor on node that crash" taggedAs LongRunningTest in { + "be able to shutdown system when using remote deployed actor on node that crash" taggedAs LongRunningTest in within(20 seconds) { runOn(fourth) { val hello = system.actorOf(Props[Hello], "hello") hello.isInstanceOf[RemoteActorRef] must be(true) @@ -141,6 +147,9 @@ abstract class ClusterDeathWatchSpec enterBarrier("hello-deployed") markNodeAsUnavailable(first) + awaitCond(clusterView.unreachableMembers.exists(_.address == address(first))) + cluster.down(first) + val t = expectMsgType[Terminated] t.actor must be(hello)