diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala index 7688cff9d7..93e2461aea 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala @@ -4,23 +4,19 @@ package akka.cluster.client +import scala.annotation.nowarn import scala.concurrent.Await import scala.concurrent.duration._ +import scala.language.postfixOps -import scala.annotation.nowarn import com.typesafe.config.ConfigFactory -import language.postfixOps -import akka.actor.{ - Actor, - ActorPath, - ActorRef, - ActorSystem, - Address, - ExtendedActorSystem, - NoSerializationVerificationNeeded, - Props -} +import akka.actor.Actor +import akka.actor.ActorPath +import akka.actor.ActorRef +import akka.actor.Address +import akka.actor.NoSerializationVerificationNeeded +import akka.actor.Props import akka.cluster.Cluster import akka.cluster.client.ClusterClientSpec.TestClientListener.LatestContactPoints import akka.cluster.client.ClusterClientSpec.TestReceptionistListener.LatestClusterClients @@ -374,143 +370,5 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod enterBarrier("after-7") } - "re-establish connection to another receptionist when server is shutdown" in within(30 seconds) { - runOn(first, second, third, fourth) { - val service2 = system.actorOf(Props(classOf[TestService], testActor), "service2") - ClusterClientReceptionist(system).registerService(service2) - awaitCount(8) - } - enterBarrier("service2-replicated") - - runOn(client) { - val client = - system.actorOf( - ClusterClient.props(ClusterClientSettings(system).withInitialContacts(initialContacts)), - "client2") - - client ! ClusterClient.Send("/user/service2", "bonjour", localAffinity = true) - val reply = expectMsgType[Reply] - reply.msg should be("bonjour-ack") - val receptionistRoleName = roleName(reply.node) match { - case Some(r) => r - case None => fail("unexpected missing roleName: " + reply.node) - } - testConductor.exit(receptionistRoleName, 0).await - remainingServerRoleNames -= receptionistRoleName - awaitAssert({ - client ! ClusterClient.Send("/user/service2", "hi again", localAffinity = true) - expectMsgType[Reply](1 second).msg should be("hi again-ack") - }, max = remaining - 3.seconds) - system.stop(client) - } - enterBarrier("verifed-3") - receiveWhile(2 seconds) { - case "hi again" => - case other => fail("unexpected message: " + other) - } - enterBarrier("verifed-4") - runOn(client) { - // Locate the test listener from a previous test and see that it agrees - // with what the client is telling it about what receptionists are alive - val listener = system.actorSelection("/user/reporter-client-listener") - val expectedContacts = remainingServerRoleNames.map(node(_) / "system" / "receptionist") - awaitAssert({ - listener ! TestClientListener.GetLatestContactPoints - expectMsgType[LatestContactPoints].contactPoints should ===(expectedContacts) - }, max = 10.seconds) - } - enterBarrier("after-6") - } - - "re-establish connection to receptionist after partition" in within(30 seconds) { - runOn(client) { - val c = system.actorOf( - ClusterClient.props(ClusterClientSettings(system).withInitialContacts(initialContacts)), - "client3") - - c ! ClusterClient.Send("/user/service2", "bonjour2", localAffinity = true) - val reply = expectMsgType[Reply] - reply.msg should be("bonjour2-ack") - val receptionistRoleName = roleName(reply.node) match { - case Some(r) => r - case None => fail("unexpected missing roleName: " + reply.node) - } - // shutdown all but the one that the client is connected to - remainingServerRoleNames.foreach { r => - if (r != receptionistRoleName) - testConductor.exit(r, 0).await - } - remainingServerRoleNames = Set(receptionistRoleName) - // network partition between client and server - testConductor.blackhole(client, receptionistRoleName, Direction.Both).await - c ! ClusterClient.Send("/user/service2", "ping", localAffinity = true) - // if we would use remote watch the failure detector would trigger and - // connection quarantined - expectNoMessage(5 seconds) - - testConductor.passThrough(client, receptionistRoleName, Direction.Both).await - - val expectedAddress = node(receptionistRoleName).address - awaitAssert { - val probe = TestProbe() - c.tell(ClusterClient.Send("/user/service2", "bonjour3", localAffinity = true), probe.ref) - val reply = probe.expectMsgType[Reply](1 second) - reply.msg should be("bonjour3-ack") - reply.node should be(expectedAddress) - } - system.stop(c) - } - - enterBarrier("after-8") - } - - "re-establish connection to receptionist after server restart" in within(30 seconds) { - runOn(client) { - remainingServerRoleNames.size should ===(1) - val remainingContacts = remainingServerRoleNames.map { r => - node(r) / "system" / "receptionist" - } - val c = - system.actorOf( - ClusterClient.props(ClusterClientSettings(system).withInitialContacts(remainingContacts)), - "client4") - - c ! ClusterClient.Send("/user/service2", "bonjour4", localAffinity = true) - expectMsg(10.seconds, Reply("bonjour4-ack", remainingContacts.head.address)) - - val logSource = s"${system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress}/user/client4" - - EventFilter.info(start = "Connected to", source = logSource, occurrences = 1).intercept { - EventFilter.info(start = "Lost contact", source = logSource, occurrences = 1).intercept { - // shutdown server - testConductor.shutdown(remainingServerRoleNames.head).await - } - } - - c ! ClusterClient.Send("/user/service2", "shutdown", localAffinity = true) - Thread.sleep(2000) // to ensure that it is sent out before shutting down system - } - - // There is only one client JVM and one server JVM left. The other JVMs have been exited - // by previous test steps. However, on the we don't know which server JVM that is left here - // so we let the following run on all server JVMs, but there is actually only one alive. - runOn(remainingServerRoleNames.toSeq: _*) { - Await.ready(system.whenTerminated, 20.seconds) - // start new system on same port - val port = Cluster(system).selfAddress.port.get - val sys2 = ActorSystem( - system.name, - ConfigFactory.parseString(s""" - akka.remote.artery.canonical.port=$port - akka.remote.classic.netty.tcp.port=$port - """).withFallback(system.settings.config)) - Cluster(sys2).join(Cluster(sys2).selfAddress) - val service2 = sys2.actorOf(Props(classOf[TestService], testActor), "service2") - ClusterClientReceptionist(sys2).registerService(service2) - Await.ready(sys2.whenTerminated, 20.seconds) - } - - } - } }