diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index fe14bb4fc7..1fc5f1ce97 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -6,13 +6,11 @@ package akka.cluster.sharding import scala.concurrent.duration._ import scala.language.postfixOps - import com.typesafe.config.ConfigFactory - import akka.actor._ import akka.cluster.Cluster import akka.cluster.ddata.{ Replicator, ReplicatorSettings } -import akka.cluster.sharding.ShardCoordinator.Internal.{ HandOff, ShardStopped } +import akka.cluster.sharding.ShardCoordinator.Internal.{ BeginHandOff, BeginHandOffAck, HandOff, ShardStopped } import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardRegion.{ CurrentRegions, GetCurrentRegions, Passivate } import akka.cluster.sharding.internal.{ DDataRememberEntitiesProvider, EventSourcedRememberEntitiesProvider } @@ -766,16 +764,28 @@ abstract class ClusterShardingSpec(multiNodeConfig: ClusterShardingSpecConfig) } enterBarrier("persistent-started") + // watch-out, these two var are only init on 3rd node + var shard: ActorSelection = null + var region: ActorSelection = null runOn(third) { //Create an increment counter 1 persistentEntitiesRegion ! EntityEnvelope(1, Increment) persistentEntitiesRegion ! Get(1) expectMsg(1) - //Shut down the shard and confirm it's dead - val shard = system.actorSelection(lastSender.path.parent) - val region = system.actorSelection(lastSender.path.parent.parent) + shard = system.actorSelection(lastSender.path.parent) + region = system.actorSelection(lastSender.path.parent.parent) + } + enterBarrier("counter-incremented") + // clean up shard cache everywhere + runOn(third, fourth, fifth) { + persistentEntitiesRegion ! BeginHandOff("1") + expectMsg(10 seconds, "ShardStopped not received", BeginHandOffAck("1")) + } + enterBarrier("everybody-hand-off-ack") + + runOn(third) { //Stop the shard cleanly region ! HandOff("1") expectMsg(10 seconds, "ShardStopped not received", ShardStopped("1")) @@ -787,8 +797,10 @@ abstract class ClusterShardingSpec(multiNodeConfig: ClusterShardingSpecConfig) }, 5 seconds, 500 millis) //Get the path to where the shard now resides - persistentEntitiesRegion ! Get(13) - expectMsg(0) + awaitAssert({ + persistentEntitiesRegion ! Get(13) + expectMsg(0) + }, 5 seconds, 500 millis) //Check that counter 1 is now alive again, even though we have // not sent a message to it via the ShardRegion @@ -804,7 +816,6 @@ abstract class ClusterShardingSpec(multiNodeConfig: ClusterShardingSpecConfig) counter1 ! Get(1) expectMsg(1) } - enterBarrier("after-shard-restart") runOn(fourth) {