* not worth investigating test failures of deprecated feature
This commit is contained in:
parent
14e213d5b2
commit
a4d04075b8
1 changed files with 8 additions and 150 deletions
|
|
@ -4,23 +4,19 @@
|
|||
|
||||
package akka.cluster.client
|
||||
|
||||
import scala.annotation.nowarn
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
import scala.language.postfixOps
|
||||
|
||||
import scala.annotation.nowarn
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import language.postfixOps
|
||||
|
||||
import akka.actor.{
|
||||
Actor,
|
||||
ActorPath,
|
||||
ActorRef,
|
||||
ActorSystem,
|
||||
Address,
|
||||
ExtendedActorSystem,
|
||||
NoSerializationVerificationNeeded,
|
||||
Props
|
||||
}
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorPath
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.Address
|
||||
import akka.actor.NoSerializationVerificationNeeded
|
||||
import akka.actor.Props
|
||||
import akka.cluster.Cluster
|
||||
import akka.cluster.client.ClusterClientSpec.TestClientListener.LatestContactPoints
|
||||
import akka.cluster.client.ClusterClientSpec.TestReceptionistListener.LatestClusterClients
|
||||
|
|
@ -374,143 +370,5 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
|
|||
enterBarrier("after-7")
|
||||
}
|
||||
|
||||
"re-establish connection to another receptionist when server is shutdown" in within(30 seconds) {
|
||||
runOn(first, second, third, fourth) {
|
||||
val service2 = system.actorOf(Props(classOf[TestService], testActor), "service2")
|
||||
ClusterClientReceptionist(system).registerService(service2)
|
||||
awaitCount(8)
|
||||
}
|
||||
enterBarrier("service2-replicated")
|
||||
|
||||
runOn(client) {
|
||||
val client =
|
||||
system.actorOf(
|
||||
ClusterClient.props(ClusterClientSettings(system).withInitialContacts(initialContacts)),
|
||||
"client2")
|
||||
|
||||
client ! ClusterClient.Send("/user/service2", "bonjour", localAffinity = true)
|
||||
val reply = expectMsgType[Reply]
|
||||
reply.msg should be("bonjour-ack")
|
||||
val receptionistRoleName = roleName(reply.node) match {
|
||||
case Some(r) => r
|
||||
case None => fail("unexpected missing roleName: " + reply.node)
|
||||
}
|
||||
testConductor.exit(receptionistRoleName, 0).await
|
||||
remainingServerRoleNames -= receptionistRoleName
|
||||
awaitAssert({
|
||||
client ! ClusterClient.Send("/user/service2", "hi again", localAffinity = true)
|
||||
expectMsgType[Reply](1 second).msg should be("hi again-ack")
|
||||
}, max = remaining - 3.seconds)
|
||||
system.stop(client)
|
||||
}
|
||||
enterBarrier("verifed-3")
|
||||
receiveWhile(2 seconds) {
|
||||
case "hi again" =>
|
||||
case other => fail("unexpected message: " + other)
|
||||
}
|
||||
enterBarrier("verifed-4")
|
||||
runOn(client) {
|
||||
// Locate the test listener from a previous test and see that it agrees
|
||||
// with what the client is telling it about what receptionists are alive
|
||||
val listener = system.actorSelection("/user/reporter-client-listener")
|
||||
val expectedContacts = remainingServerRoleNames.map(node(_) / "system" / "receptionist")
|
||||
awaitAssert({
|
||||
listener ! TestClientListener.GetLatestContactPoints
|
||||
expectMsgType[LatestContactPoints].contactPoints should ===(expectedContacts)
|
||||
}, max = 10.seconds)
|
||||
}
|
||||
enterBarrier("after-6")
|
||||
}
|
||||
|
||||
"re-establish connection to receptionist after partition" in within(30 seconds) {
|
||||
runOn(client) {
|
||||
val c = system.actorOf(
|
||||
ClusterClient.props(ClusterClientSettings(system).withInitialContacts(initialContacts)),
|
||||
"client3")
|
||||
|
||||
c ! ClusterClient.Send("/user/service2", "bonjour2", localAffinity = true)
|
||||
val reply = expectMsgType[Reply]
|
||||
reply.msg should be("bonjour2-ack")
|
||||
val receptionistRoleName = roleName(reply.node) match {
|
||||
case Some(r) => r
|
||||
case None => fail("unexpected missing roleName: " + reply.node)
|
||||
}
|
||||
// shutdown all but the one that the client is connected to
|
||||
remainingServerRoleNames.foreach { r =>
|
||||
if (r != receptionistRoleName)
|
||||
testConductor.exit(r, 0).await
|
||||
}
|
||||
remainingServerRoleNames = Set(receptionistRoleName)
|
||||
// network partition between client and server
|
||||
testConductor.blackhole(client, receptionistRoleName, Direction.Both).await
|
||||
c ! ClusterClient.Send("/user/service2", "ping", localAffinity = true)
|
||||
// if we would use remote watch the failure detector would trigger and
|
||||
// connection quarantined
|
||||
expectNoMessage(5 seconds)
|
||||
|
||||
testConductor.passThrough(client, receptionistRoleName, Direction.Both).await
|
||||
|
||||
val expectedAddress = node(receptionistRoleName).address
|
||||
awaitAssert {
|
||||
val probe = TestProbe()
|
||||
c.tell(ClusterClient.Send("/user/service2", "bonjour3", localAffinity = true), probe.ref)
|
||||
val reply = probe.expectMsgType[Reply](1 second)
|
||||
reply.msg should be("bonjour3-ack")
|
||||
reply.node should be(expectedAddress)
|
||||
}
|
||||
system.stop(c)
|
||||
}
|
||||
|
||||
enterBarrier("after-8")
|
||||
}
|
||||
|
||||
"re-establish connection to receptionist after server restart" in within(30 seconds) {
|
||||
runOn(client) {
|
||||
remainingServerRoleNames.size should ===(1)
|
||||
val remainingContacts = remainingServerRoleNames.map { r =>
|
||||
node(r) / "system" / "receptionist"
|
||||
}
|
||||
val c =
|
||||
system.actorOf(
|
||||
ClusterClient.props(ClusterClientSettings(system).withInitialContacts(remainingContacts)),
|
||||
"client4")
|
||||
|
||||
c ! ClusterClient.Send("/user/service2", "bonjour4", localAffinity = true)
|
||||
expectMsg(10.seconds, Reply("bonjour4-ack", remainingContacts.head.address))
|
||||
|
||||
val logSource = s"${system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress}/user/client4"
|
||||
|
||||
EventFilter.info(start = "Connected to", source = logSource, occurrences = 1).intercept {
|
||||
EventFilter.info(start = "Lost contact", source = logSource, occurrences = 1).intercept {
|
||||
// shutdown server
|
||||
testConductor.shutdown(remainingServerRoleNames.head).await
|
||||
}
|
||||
}
|
||||
|
||||
c ! ClusterClient.Send("/user/service2", "shutdown", localAffinity = true)
|
||||
Thread.sleep(2000) // to ensure that it is sent out before shutting down system
|
||||
}
|
||||
|
||||
// There is only one client JVM and one server JVM left. The other JVMs have been exited
|
||||
// by previous test steps. However, on the we don't know which server JVM that is left here
|
||||
// so we let the following run on all server JVMs, but there is actually only one alive.
|
||||
runOn(remainingServerRoleNames.toSeq: _*) {
|
||||
Await.ready(system.whenTerminated, 20.seconds)
|
||||
// start new system on same port
|
||||
val port = Cluster(system).selfAddress.port.get
|
||||
val sys2 = ActorSystem(
|
||||
system.name,
|
||||
ConfigFactory.parseString(s"""
|
||||
akka.remote.artery.canonical.port=$port
|
||||
akka.remote.classic.netty.tcp.port=$port
|
||||
""").withFallback(system.settings.config))
|
||||
Cluster(sys2).join(Cluster(sys2).selfAddress)
|
||||
val service2 = sys2.actorOf(Props(classOf[TestService], testActor), "service2")
|
||||
ClusterClientReceptionist(sys2).registerService(service2)
|
||||
Await.ready(sys2.whenTerminated, 20.seconds)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue