diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala index 5d378cbfbe..8023314022 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala @@ -337,7 +337,7 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac var contacts = initialContactsSel sendGetContacts() - var contactPathsPublished = HashSet.empty[ActorPath] + var contactPathsPublished = contactPaths var subscribers = Vector.empty[ActorRef] diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala index e8ba308bac..9deeecdf83 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala @@ -326,6 +326,38 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod enterBarrier("after-5") } + "report a removal of a receptionist" in within(10 seconds) { + runOn(client) { + val unreachableContact = node(client) / "system" / "receptionist" + val expectedRoles = Set(first, second, third, fourth) + val expectedContacts = expectedRoles.map(node(_) / "system" / "receptionist") + + // We need to slow down things otherwise our receptionists can sometimes tell us + // that our unreachableContact is unreachable before we get a chance to + // subscribe to events. + expectedRoles.foreach { role ⇒ + testConductor.blackhole(client, role, Direction.Both).await + } + + val c = system.actorOf(ClusterClient.props( + ClusterClientSettings(system).withInitialContacts(expectedContacts + unreachableContact)), "client5") + + val probe = TestProbe() + c.tell(SubscribeContactPoints, probe.ref) + + expectedRoles.foreach { role ⇒ + testConductor.passThrough(client, role, Direction.Both).await + } + + within(10.seconds) { + awaitAssert { + probe.expectMsgType[ContactPointRemoved].contactPoint should ===(unreachableContact) + } + } + } + enterBarrier("after-7") + } + "re-establish connection to another receptionist when server is shutdown" in within(30 seconds) { runOn(first, second, third, fourth) { val service2 = system.actorOf(Props(classOf[TestService], testActor), "service2") @@ -415,7 +447,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod system.stop(c) } - enterBarrier("after-7") + enterBarrier("after-8") } "re-establish connection to receptionist after server restart" in within(30 seconds) {