diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala index 0f65376039..6ad3f522d5 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala @@ -72,24 +72,25 @@ class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardi // Using region 2 as it is not shutdown in either test def pingEntities(): Unit = { - region2.tell(1, probe2.ref) - probe2.expectMsg(10.seconds, 1) - region2.tell(2, probe2.ref) - probe2.expectMsg(10.seconds, 2) - region2.tell(3, probe2.ref) - probe2.expectMsg(10.seconds, 3) + awaitAssert({ + val p1 = TestProbe()(sys2) + region2.tell(1, p1.ref) + p1.expectMsg(1.seconds, 1) + val p2 = TestProbe()(sys2) + region2.tell(2, p2.ref) + p2.expectMsg(1.seconds, 2) + val p3 = TestProbe()(sys2) + region2.tell(3, p3.ref) + p3.expectMsg(1.seconds, 3) + }, 10.seconds) } "Sharding and CoordinatedShutdown" must { "init cluster" in { - // FIXME this test should also work when coordinator is on the leaving sys1 node, - // but currently there seems to be a race between the CS and the ClusterSingleton observing OldestChanged - // and terminating coordinator singleton before the graceful sharding stop is done. + Cluster(sys1).join(Cluster(sys1).selfAddress) // coordinator will initially run on sys1 + awaitAssert(Cluster(sys1).selfMember.status should ===(MemberStatus.Up)) - Cluster(sys2).join(Cluster(sys2).selfAddress) // coordinator will initially run on sys2 - awaitAssert(Cluster(sys2).selfMember.status should ===(MemberStatus.Up)) - - Cluster(sys1).join(Cluster(sys2).selfAddress) + Cluster(sys2).join(Cluster(sys1).selfAddress) within(10.seconds) { awaitAssert { Cluster(sys1).state.members.size should ===(2) @@ -116,9 +117,9 @@ class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardi "run coordinated shutdown when leaving" in { Cluster(sys3).leave(Cluster(sys1).selfAddress) - probe1.expectMsg("CS-unbind-1") + probe1.expectMsg(10.seconds, "CS-unbind-1") - within(10.seconds) { + within(20.seconds) { awaitAssert { Cluster(sys2).state.members.size should ===(2) Cluster(sys3).state.members.size should ===(2) @@ -137,9 +138,9 @@ class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardi "run coordinated shutdown when downing" in { // coordinator is on sys2 Cluster(sys2).down(Cluster(sys3).selfAddress) - probe3.expectMsg("CS-unbind-3") + probe3.expectMsg(10.seconds, "CS-unbind-3") - within(10.seconds) { + within(20.seconds) { awaitAssert { Cluster(sys2).state.members.size should ===(1) }