diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala index 19d97bb07f..7ff0ebbc5a 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala @@ -9,6 +9,8 @@ import com.typesafe.config.ConfigFactory import akka.actor.Actor import akka.actor.ActorPath import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem import akka.actor.Props import akka.cluster.Cluster import akka.cluster.ClusterEvent._ @@ -20,6 +22,7 @@ import akka.testkit._ import akka.actor.Address import akka.cluster.pubsub._ import akka.remote.transport.ThrottlerTransportAdapter.Direction +import scala.concurrent.Await object ClusterClientSpec extends MultiNodeConfig { val client = role("client") @@ -37,6 +40,7 @@ object ClusterClientSpec extends MultiNodeConfig { akka.cluster.client.acceptable-heartbeat-pause = 3s # number-of-contacts must be >= 4 because we shutdown all but one in the end akka.cluster.client.number-of-contacts = 4 + akka.test.filter-leeway = 10s """)) testTransport(on = true) @@ -45,6 +49,8 @@ object ClusterClientSpec extends MultiNodeConfig { class TestService(testActor: ActorRef) extends Actor { def receive = { + case "shutdown" ⇒ + context.system.terminate() case msg ⇒ testActor forward msg sender() ! Reply(msg + "-ack", Cluster(context.system).selfAddress) @@ -245,8 +251,9 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod val expectedAddress = node(receptionistRoleName).address awaitAssert { - c ! ClusterClient.Send("/user/service2", "bonjour3", localAffinity = true) - val reply = expectMsgType[Reply](1 second) + val probe = TestProbe() + c.tell(ClusterClient.Send("/user/service2", "bonjour3", localAffinity = true), probe.ref) + val reply = probe.expectMsgType[Reply](1 second) reply.msg should be("bonjour3-ack") reply.node should be(expectedAddress) } @@ -256,5 +263,47 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod enterBarrier("after-5") } + "re-establish connection to receptionist after server restart" in within(30 seconds) { + runOn(client) { + remainingServerRoleNames.size should ===(1) + val remainingContacts = remainingServerRoleNames.map { r ⇒ + node(r) / "system" / "receptionist" + } + val c = system.actorOf(ClusterClient.props( + ClusterClientSettings(system).withInitialContacts(remainingContacts)), "client4") + + c ! ClusterClient.Send("/user/service2", "bonjour4", localAffinity = true) + expectMsg(10.seconds, Reply("bonjour4-ack", remainingContacts.head.address)) + + val logSource = s"${system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress}/user/client4" + + EventFilter.info(start = "Connected to", source = logSource, occurrences = 1) intercept { + EventFilter.info(start = "Lost contact", source = logSource, occurrences = 1) intercept { + // shutdown server + testConductor.shutdown(remainingServerRoleNames.head).await + } + } + + c ! ClusterClient.Send("/user/service2", "shutdown", localAffinity = true) + Thread.sleep(2000) // to ensure that it is sent out before shutting down system + } + + // There is only one client JVM and one server JVM left. The other JVMs have been exited + // by previous test steps. However, on the we don't know which server JVM that is left here + // so we let the following run on all server JVMs, but there is actually only one alive. + runOn(remainingServerRoleNames.toSeq: _*) { + Await.ready(system.whenTerminated, 20.seconds) + // start new system on same port + val sys2 = ActorSystem(system.name, + ConfigFactory.parseString("akka.remote.netty.tcp.port=" + Cluster(system).selfAddress.port.get) + .withFallback(system.settings.config)) + Cluster(sys2).join(Cluster(sys2).selfAddress) + val service2 = sys2.actorOf(Props(classOf[TestService], testActor), "service2") + ClusterClientReceptionist(sys2).registerService(service2) + Await.ready(sys2.whenTerminated, 20.seconds) + } + + } + } }