=clt #18459 Add ClusterClient test for quick restart
This commit is contained in:
parent
f74fbc354d
commit
15bcd5c41f
1 changed files with 51 additions and 2 deletions
|
|
@ -9,6 +9,8 @@ import com.typesafe.config.ConfigFactory
|
||||||
import akka.actor.Actor
|
import akka.actor.Actor
|
||||||
import akka.actor.ActorPath
|
import akka.actor.ActorPath
|
||||||
import akka.actor.ActorRef
|
import akka.actor.ActorRef
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
import akka.actor.ExtendedActorSystem
|
||||||
import akka.actor.Props
|
import akka.actor.Props
|
||||||
import akka.cluster.Cluster
|
import akka.cluster.Cluster
|
||||||
import akka.cluster.ClusterEvent._
|
import akka.cluster.ClusterEvent._
|
||||||
|
|
@ -20,6 +22,7 @@ import akka.testkit._
|
||||||
import akka.actor.Address
|
import akka.actor.Address
|
||||||
import akka.cluster.pubsub._
|
import akka.cluster.pubsub._
|
||||||
import akka.remote.transport.ThrottlerTransportAdapter.Direction
|
import akka.remote.transport.ThrottlerTransportAdapter.Direction
|
||||||
|
import scala.concurrent.Await
|
||||||
|
|
||||||
object ClusterClientSpec extends MultiNodeConfig {
|
object ClusterClientSpec extends MultiNodeConfig {
|
||||||
val client = role("client")
|
val client = role("client")
|
||||||
|
|
@ -37,6 +40,7 @@ object ClusterClientSpec extends MultiNodeConfig {
|
||||||
akka.cluster.client.acceptable-heartbeat-pause = 3s
|
akka.cluster.client.acceptable-heartbeat-pause = 3s
|
||||||
# number-of-contacts must be >= 4 because we shutdown all but one in the end
|
# number-of-contacts must be >= 4 because we shutdown all but one in the end
|
||||||
akka.cluster.client.number-of-contacts = 4
|
akka.cluster.client.number-of-contacts = 4
|
||||||
|
akka.test.filter-leeway = 10s
|
||||||
"""))
|
"""))
|
||||||
|
|
||||||
testTransport(on = true)
|
testTransport(on = true)
|
||||||
|
|
@ -45,6 +49,8 @@ object ClusterClientSpec extends MultiNodeConfig {
|
||||||
|
|
||||||
class TestService(testActor: ActorRef) extends Actor {
|
class TestService(testActor: ActorRef) extends Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
|
case "shutdown" ⇒
|
||||||
|
context.system.terminate()
|
||||||
case msg ⇒
|
case msg ⇒
|
||||||
testActor forward msg
|
testActor forward msg
|
||||||
sender() ! Reply(msg + "-ack", Cluster(context.system).selfAddress)
|
sender() ! Reply(msg + "-ack", Cluster(context.system).selfAddress)
|
||||||
|
|
@ -245,8 +251,9 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
|
||||||
|
|
||||||
val expectedAddress = node(receptionistRoleName).address
|
val expectedAddress = node(receptionistRoleName).address
|
||||||
awaitAssert {
|
awaitAssert {
|
||||||
c ! ClusterClient.Send("/user/service2", "bonjour3", localAffinity = true)
|
val probe = TestProbe()
|
||||||
val reply = expectMsgType[Reply](1 second)
|
c.tell(ClusterClient.Send("/user/service2", "bonjour3", localAffinity = true), probe.ref)
|
||||||
|
val reply = probe.expectMsgType[Reply](1 second)
|
||||||
reply.msg should be("bonjour3-ack")
|
reply.msg should be("bonjour3-ack")
|
||||||
reply.node should be(expectedAddress)
|
reply.node should be(expectedAddress)
|
||||||
}
|
}
|
||||||
|
|
@ -256,5 +263,47 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
|
||||||
enterBarrier("after-5")
|
enterBarrier("after-5")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"re-establish connection to receptionist after server restart" in within(30 seconds) {
|
||||||
|
runOn(client) {
|
||||||
|
remainingServerRoleNames.size should ===(1)
|
||||||
|
val remainingContacts = remainingServerRoleNames.map { r ⇒
|
||||||
|
node(r) / "system" / "receptionist"
|
||||||
|
}
|
||||||
|
val c = system.actorOf(ClusterClient.props(
|
||||||
|
ClusterClientSettings(system).withInitialContacts(remainingContacts)), "client4")
|
||||||
|
|
||||||
|
c ! ClusterClient.Send("/user/service2", "bonjour4", localAffinity = true)
|
||||||
|
expectMsg(10.seconds, Reply("bonjour4-ack", remainingContacts.head.address))
|
||||||
|
|
||||||
|
val logSource = s"${system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress}/user/client4"
|
||||||
|
|
||||||
|
EventFilter.info(start = "Connected to", source = logSource, occurrences = 1) intercept {
|
||||||
|
EventFilter.info(start = "Lost contact", source = logSource, occurrences = 1) intercept {
|
||||||
|
// shutdown server
|
||||||
|
testConductor.shutdown(remainingServerRoleNames.head).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c ! ClusterClient.Send("/user/service2", "shutdown", localAffinity = true)
|
||||||
|
Thread.sleep(2000) // to ensure that it is sent out before shutting down system
|
||||||
|
}
|
||||||
|
|
||||||
|
// There is only one client JVM and one server JVM left. The other JVMs have been exited
|
||||||
|
// by previous test steps. However, on the we don't know which server JVM that is left here
|
||||||
|
// so we let the following run on all server JVMs, but there is actually only one alive.
|
||||||
|
runOn(remainingServerRoleNames.toSeq: _*) {
|
||||||
|
Await.ready(system.whenTerminated, 20.seconds)
|
||||||
|
// start new system on same port
|
||||||
|
val sys2 = ActorSystem(system.name,
|
||||||
|
ConfigFactory.parseString("akka.remote.netty.tcp.port=" + Cluster(system).selfAddress.port.get)
|
||||||
|
.withFallback(system.settings.config))
|
||||||
|
Cluster(sys2).join(Cluster(sys2).selfAddress)
|
||||||
|
val service2 = sys2.actorOf(Props(classOf[TestService], testActor), "service2")
|
||||||
|
ClusterClientReceptionist(sys2).registerService(service2)
|
||||||
|
Await.ready(sys2.whenTerminated, 20.seconds)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue