harden CoordinatedShutdownShardingSpec, #24113
* There might be one case when the singleton coordinator hand over might start before the gracful stop of the region is completed on other node. * I think this is rare enough to just accept that message might be sent to wrong location (we don't guarantee anything more than best effort anyway). * Safe rolling upgrade should keep the coordinator (oldest) until last to avoid such races
This commit is contained in:
parent
930d2e9133
commit
9e8c10b5a6
1 changed files with 18 additions and 17 deletions
|
|
@ -72,24 +72,25 @@ class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardi
|
||||||
|
|
||||||
// Using region 2 as it is not shutdown in either test
|
// Using region 2 as it is not shutdown in either test
|
||||||
def pingEntities(): Unit = {
|
def pingEntities(): Unit = {
|
||||||
region2.tell(1, probe2.ref)
|
awaitAssert({
|
||||||
probe2.expectMsg(10.seconds, 1)
|
val p1 = TestProbe()(sys2)
|
||||||
region2.tell(2, probe2.ref)
|
region2.tell(1, p1.ref)
|
||||||
probe2.expectMsg(10.seconds, 2)
|
p1.expectMsg(1.seconds, 1)
|
||||||
region2.tell(3, probe2.ref)
|
val p2 = TestProbe()(sys2)
|
||||||
probe2.expectMsg(10.seconds, 3)
|
region2.tell(2, p2.ref)
|
||||||
|
p2.expectMsg(1.seconds, 2)
|
||||||
|
val p3 = TestProbe()(sys2)
|
||||||
|
region2.tell(3, p3.ref)
|
||||||
|
p3.expectMsg(1.seconds, 3)
|
||||||
|
}, 10.seconds)
|
||||||
}
|
}
|
||||||
|
|
||||||
"Sharding and CoordinatedShutdown" must {
|
"Sharding and CoordinatedShutdown" must {
|
||||||
"init cluster" in {
|
"init cluster" in {
|
||||||
// FIXME this test should also work when coordinator is on the leaving sys1 node,
|
Cluster(sys1).join(Cluster(sys1).selfAddress) // coordinator will initially run on sys1
|
||||||
// but currently there seems to be a race between the CS and the ClusterSingleton observing OldestChanged
|
awaitAssert(Cluster(sys1).selfMember.status should ===(MemberStatus.Up))
|
||||||
// and terminating coordinator singleton before the graceful sharding stop is done.
|
|
||||||
|
|
||||||
Cluster(sys2).join(Cluster(sys2).selfAddress) // coordinator will initially run on sys2
|
Cluster(sys2).join(Cluster(sys1).selfAddress)
|
||||||
awaitAssert(Cluster(sys2).selfMember.status should ===(MemberStatus.Up))
|
|
||||||
|
|
||||||
Cluster(sys1).join(Cluster(sys2).selfAddress)
|
|
||||||
within(10.seconds) {
|
within(10.seconds) {
|
||||||
awaitAssert {
|
awaitAssert {
|
||||||
Cluster(sys1).state.members.size should ===(2)
|
Cluster(sys1).state.members.size should ===(2)
|
||||||
|
|
@ -116,9 +117,9 @@ class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardi
|
||||||
|
|
||||||
"run coordinated shutdown when leaving" in {
|
"run coordinated shutdown when leaving" in {
|
||||||
Cluster(sys3).leave(Cluster(sys1).selfAddress)
|
Cluster(sys3).leave(Cluster(sys1).selfAddress)
|
||||||
probe1.expectMsg("CS-unbind-1")
|
probe1.expectMsg(10.seconds, "CS-unbind-1")
|
||||||
|
|
||||||
within(10.seconds) {
|
within(20.seconds) {
|
||||||
awaitAssert {
|
awaitAssert {
|
||||||
Cluster(sys2).state.members.size should ===(2)
|
Cluster(sys2).state.members.size should ===(2)
|
||||||
Cluster(sys3).state.members.size should ===(2)
|
Cluster(sys3).state.members.size should ===(2)
|
||||||
|
|
@ -137,9 +138,9 @@ class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardi
|
||||||
"run coordinated shutdown when downing" in {
|
"run coordinated shutdown when downing" in {
|
||||||
// coordinator is on sys2
|
// coordinator is on sys2
|
||||||
Cluster(sys2).down(Cluster(sys3).selfAddress)
|
Cluster(sys2).down(Cluster(sys3).selfAddress)
|
||||||
probe3.expectMsg("CS-unbind-3")
|
probe3.expectMsg(10.seconds, "CS-unbind-3")
|
||||||
|
|
||||||
within(10.seconds) {
|
within(20.seconds) {
|
||||||
awaitAssert {
|
awaitAssert {
|
||||||
Cluster(sys2).state.members.size should ===(1)
|
Cluster(sys2).state.members.size should ===(1)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue