Corrected the setup of contact paths for subscribers #22991

The initial contact paths should be assumed as published as that's what client subscribers will receive upon subscription. Any changes to contact points are a delta to that.
This commit is contained in:
Christopher Hunt 2017-06-04 11:40:17 +02:00 committed by Johan Andrén
parent ac166da042
commit f4bf497536
2 changed files with 34 additions and 2 deletions

View file

@ -337,7 +337,7 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac
var contacts = initialContactsSel var contacts = initialContactsSel
sendGetContacts() sendGetContacts()
var contactPathsPublished = HashSet.empty[ActorPath] var contactPathsPublished = contactPaths
var subscribers = Vector.empty[ActorRef] var subscribers = Vector.empty[ActorRef]

View file

@ -326,6 +326,38 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
enterBarrier("after-5") enterBarrier("after-5")
} }
"report a removal of a receptionist" in within(10 seconds) {
runOn(client) {
val unreachableContact = node(client) / "system" / "receptionist"
val expectedRoles = Set(first, second, third, fourth)
val expectedContacts = expectedRoles.map(node(_) / "system" / "receptionist")
// We need to slow down things otherwise our receptionists can sometimes tell us
// that our unreachableContact is unreachable before we get a chance to
// subscribe to events.
expectedRoles.foreach { role
testConductor.blackhole(client, role, Direction.Both).await
}
val c = system.actorOf(ClusterClient.props(
ClusterClientSettings(system).withInitialContacts(expectedContacts + unreachableContact)), "client5")
val probe = TestProbe()
c.tell(SubscribeContactPoints, probe.ref)
expectedRoles.foreach { role
testConductor.passThrough(client, role, Direction.Both).await
}
within(10.seconds) {
awaitAssert {
probe.expectMsgType[ContactPointRemoved].contactPoint should ===(unreachableContact)
}
}
}
enterBarrier("after-7")
}
"re-establish connection to another receptionist when server is shutdown" in within(30 seconds) { "re-establish connection to another receptionist when server is shutdown" in within(30 seconds) {
runOn(first, second, third, fourth) { runOn(first, second, third, fourth) {
val service2 = system.actorOf(Props(classOf[TestService], testActor), "service2") val service2 = system.actorOf(Props(classOf[TestService], testActor), "service2")
@ -415,7 +447,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
system.stop(c) system.stop(c)
} }
enterBarrier("after-7") enterBarrier("after-8")
} }
"re-establish connection to receptionist after server restart" in within(30 seconds) { "re-establish connection to receptionist after server restart" in within(30 seconds) {