diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesNewExtractorSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesNewExtractorSpec.scala index 01a43a1616..8b3c6d4574 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesNewExtractorSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesNewExtractorSpec.scala @@ -55,8 +55,6 @@ abstract class ClusterShardingRememberEntitiesNewExtractorSpecConfig(val mode: S val first = role("first") val second = role("second") val third = role("third") - val fourth = role("fourth") - val fifth = role("fifth") commonConfig(ConfigFactory.parseString(s""" akka.loglevel = DEBUG @@ -102,8 +100,6 @@ abstract class ClusterShardingRememberEntitiesNewExtractorSpecConfig(val mode: S nodeConfig(second)(roleConfig.withFallback(ddataNodeAConfig)) nodeConfig(third)(roleConfig.withFallback(ddataNodeBConfig)) - nodeConfig(fourth)(roleConfig.withFallback(ddataNodeAConfig)) - nodeConfig(fifth)(roleConfig.withFallback(ddataNodeBConfig)) } @@ -118,8 +114,6 @@ class PersistentClusterShardingRememberEntitiesNewExtractorSpec extends ClusterS class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode1 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode2 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode3 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec -class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode4 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec -class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode5 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec class DDataClusterShardingRememberEntitiesNewExtractorSpec extends ClusterShardingRememberEntitiesNewExtractorSpec( DDataClusterShardingRememberEntitiesNewExtractorSpecConfig) @@ -127,8 +121,6 @@ class DDataClusterShardingRememberEntitiesNewExtractorSpec extends ClusterShardi class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode1 extends DDataClusterShardingRememberEntitiesNewExtractorSpec class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode2 extends DDataClusterShardingRememberEntitiesNewExtractorSpec class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode3 extends DDataClusterShardingRememberEntitiesNewExtractorSpec -class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode4 extends DDataClusterShardingRememberEntitiesNewExtractorSpec -class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode5 extends DDataClusterShardingRememberEntitiesNewExtractorSpec abstract class ClusterShardingRememberEntitiesNewExtractorSpec(config: ClusterShardingRememberEntitiesNewExtractorSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender { import ClusterShardingRememberEntitiesNewExtractorSpec._ @@ -168,16 +160,16 @@ abstract class ClusterShardingRememberEntitiesNewExtractorSpec(config: ClusterSh extractShardId = extractShardId1) } - def startShardingWithExtractor2(): Unit = { - ClusterSharding(system).start( + def startShardingWithExtractor2(sys: ActorSystem, probe: ActorRef): Unit = { + ClusterSharding(sys).start( typeName = typeName, - entityProps = ClusterShardingRememberEntitiesNewExtractorSpec.props(Some(testActor)), + entityProps = ClusterShardingRememberEntitiesNewExtractorSpec.props(Some(probe)), settings = ClusterShardingSettings(system).withRememberEntities(true).withRole("sharding"), extractEntityId = extractEntityId, extractShardId = extractShardId2) } - lazy val region = ClusterSharding(system).shardRegion(typeName) + def region(sys: ActorSystem = system) = ClusterSharding(sys).shardRegion(typeName) def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData @@ -192,7 +184,7 @@ abstract class ClusterShardingRememberEntitiesNewExtractorSpec(config: ClusterSh } enterBarrier("persistence-started") - runOn(second, third, fourth, fifth) { + runOn(second, third) { system.actorSelection(node(first) / "user" / "store") ! Identify(None) val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get SharedLeveldbJournal.setStore(sharedStore, system) @@ -222,7 +214,7 @@ abstract class ClusterShardingRememberEntitiesNewExtractorSpec(config: ClusterSh runOn(second, third) { // one entity for each shard id (1 to 10).foreach { n ⇒ - region ! n + region() ! n expectMsg(n) } } @@ -244,35 +236,40 @@ abstract class ClusterShardingRememberEntitiesNewExtractorSpec(config: ClusterSh enterBarrier("first-sharding-cluster-stopped") } - "start new nodes with different extractor" in within(15.seconds) { + "start new nodes with different extractor, and have the entities running on the right shards" in within(30.seconds) { // start it with a new shard id extractor, which will put the entities // on different shards - join(fourth, first) - join(fifth, first) - runOn(first) { - within(remaining) { - awaitAssert { - cluster.state.members.count(_.status == MemberStatus.Up) should ===(3) - } + runOn(second, third) { + watch(region()) + Cluster(system).leave(Cluster(system).selfAddress) + expectTerminated(region()) + awaitAssert { + Cluster(system).isTerminated should ===(true) } } - runOn(fourth, fifth) { - startShardingWithExtractor2() - } + enterBarrier("first-cluster-terminated") - // TODO how do we know that the shards has started?? - Thread.sleep(7000) - enterBarrier("new-nodes-started") - } + // no sharding nodes left of the original cluster, start a new nodes + runOn(second, third) { + val sys2 = ActorSystem(system.name, system.settings.config) + val probe2 = TestProbe()(sys2) + + if (!isDdataMode) { + sys2.actorSelection(node(first) / "user" / "store").tell(Identify(None), probe2.ref) + val sharedStore = probe2.expectMsgType[ActorIdentity](10.seconds).ref.get + SharedLeveldbJournal.setStore(sharedStore, sys2) + } + + Cluster(sys2).join(node(first).address) + startShardingWithExtractor2(sys2, probe2.ref) + probe2.expectMsgType[Started](20.seconds) - "have the remembered entities running on the right shards" in within(15.seconds) { - runOn(fourth, fifth) { var stats: ShardRegion.CurrentShardRegionState = null - within(remaining) { + within(10.seconds) { awaitAssert { - region ! ShardRegion.GetShardRegionState + region(sys2) ! ShardRegion.GetShardRegionState val reply = expectMsgType[ShardRegion.CurrentShardRegionState] reply.shards should not be empty stats = reply @@ -286,6 +283,13 @@ abstract class ClusterShardingRememberEntitiesNewExtractorSpec(config: ClusterSh val calculatedShardId = extractShardId2(entityId.toInt) calculatedShardId should ===(shardState.shardId) } + + enterBarrier("verified") + shutdown(sys2) + } + + runOn(first) { + enterBarrier("verified") } enterBarrier("done") diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala index c46f0e5150..8888d5fa7d 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala @@ -225,6 +225,8 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb Cluster(sys2).join(Cluster(sys2).selfAddress) startSharding(sys2, probe2.ref) probe2.expectMsgType[Started](20.seconds) + + shutdown(sys2) } enterBarrier("after-3") }