diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala index 5d17f9de4e..1e83913a40 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala @@ -46,63 +46,86 @@ object ClusterShardingRememberEntitiesSpec { } -abstract class ClusterShardingRememberEntitiesSpecConfig(val mode: String) extends MultiNodeConfig { +abstract class ClusterShardingRememberEntitiesSpecConfig(val mode: String, val rememberEntities: Boolean) + extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") + val targetDir = s"target/ClusterShardingRememberEntitiesSpec-$mode-remember-$rememberEntities" + + val modeConfig = + if (mode == ClusterShardingSettings.StateStoreModeDData) ConfigFactory.empty + else ConfigFactory.parseString(s""" + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" + akka.persistence.journal.leveldb-shared.timeout = 5s + akka.persistence.journal.leveldb-shared.store.native = off + akka.persistence.journal.leveldb-shared.store.dir = "$targetDir/journal" + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "$targetDir/snapshots" + """) + commonConfig( - ConfigFactory - .parseString(s""" - akka.loglevel = DEBUG - akka.actor.provider = "cluster" - akka.cluster.auto-down-unreachable-after = 0s - akka.remote.log-remote-lifecycle-events = off - akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" - akka.persistence.journal.leveldb-shared { - timeout = 5s - store { - native = off - dir = "target/ShardingRememberEntitiesSpec/journal" + modeConfig + .withFallback(ConfigFactory.parseString(s""" + akka.loglevel = DEBUG + akka.actor.provider = "cluster" + akka.cluster.auto-down-unreachable-after = 0s + akka.remote.log-remote-lifecycle-events = off + akka.cluster.sharding.state-store-mode = "$mode" + akka.cluster.sharding.distributed-data.durable.lmdb { + dir = $targetDir/sharding-ddata + map-size = 10 MiB } - } - akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/ShardingRememberEntitiesSpec/snapshots" - akka.cluster.sharding.state-store-mode = "$mode" - akka.cluster.sharding.distributed-data.durable.lmdb { - dir = target/ShardingRememberEntitiesSpec/sharding-ddata - map-size = 10 MiB - } - """) + akka.testconductor.barrier-timeout = 60 s + akka.test.single-expect-default = 60 s + """)) .withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest) .withFallback(MultiNodeClusterSpec.clusterConfig)) nodeConfig(third)(ConfigFactory.parseString(s""" akka.cluster.sharding.distributed-data.durable.lmdb { # use same directory when starting new node on third (not used at same time) - dir = target/ShardingRememberEntitiesSpec/sharding-third + dir = $targetDir/sharding-third } """)) + } -object PersistentClusterShardingRememberEntitiesSpecConfig - extends ClusterShardingRememberEntitiesSpecConfig(ClusterShardingSettings.StateStoreModePersistence) -object DDataClusterShardingRememberEntitiesSpecConfig - extends ClusterShardingRememberEntitiesSpecConfig(ClusterShardingSettings.StateStoreModeDData) +class PersistentClusterShardingRememberEntitiesSpecConfig(rememberEntities: Boolean) + extends ClusterShardingRememberEntitiesSpecConfig( + ClusterShardingSettings.StateStoreModePersistence, + rememberEntities) +class DDataClusterShardingRememberEntitiesSpecConfig(rememberEntities: Boolean) + extends ClusterShardingRememberEntitiesSpecConfig(ClusterShardingSettings.StateStoreModeDData, rememberEntities) -class PersistentClusterShardingRememberEntitiesSpec - extends ClusterShardingRememberEntitiesSpec(PersistentClusterShardingRememberEntitiesSpecConfig) +abstract class PersistentClusterShardingRememberEntitiesSpec(val rememberEntities: Boolean) + extends ClusterShardingRememberEntitiesSpec( + new PersistentClusterShardingRememberEntitiesSpecConfig(rememberEntities)) +abstract class DDataClusterShardingRememberEntitiesSpec(val rememberEntities: Boolean) + extends ClusterShardingRememberEntitiesSpec(new DDataClusterShardingRememberEntitiesSpecConfig(rememberEntities)) -class PersistentClusterShardingRememberEntitiesMultiJvmNode1 extends PersistentClusterShardingRememberEntitiesSpec -class PersistentClusterShardingRememberEntitiesMultiJvmNode2 extends PersistentClusterShardingRememberEntitiesSpec -class PersistentClusterShardingRememberEntitiesMultiJvmNode3 extends PersistentClusterShardingRememberEntitiesSpec +class PersistentClusterShardingRememberEntitiesEnabledMultiJvmNode1 + extends PersistentClusterShardingRememberEntitiesSpec(true) +class PersistentClusterShardingRememberEntitiesEnabledMultiJvmNode2 + extends PersistentClusterShardingRememberEntitiesSpec(true) +class PersistentClusterShardingRememberEntitiesEnabledMultiJvmNode3 + extends PersistentClusterShardingRememberEntitiesSpec(true) -class DDataClusterShardingRememberEntitiesSpec - extends ClusterShardingRememberEntitiesSpec(DDataClusterShardingRememberEntitiesSpecConfig) +class PersistentClusterShardingRememberEntitiesDefaultMultiJvmNode1 + extends PersistentClusterShardingRememberEntitiesSpec(false) +class PersistentClusterShardingRememberEntitiesDefaultMultiJvmNode2 + extends PersistentClusterShardingRememberEntitiesSpec(false) +class PersistentClusterShardingRememberEntitiesDefaultMultiJvmNode3 + extends PersistentClusterShardingRememberEntitiesSpec(false) -class DDataClusterShardingRememberEntitiesMultiJvmNode1 extends DDataClusterShardingRememberEntitiesSpec -class DDataClusterShardingRememberEntitiesMultiJvmNode2 extends DDataClusterShardingRememberEntitiesSpec -class DDataClusterShardingRememberEntitiesMultiJvmNode3 extends DDataClusterShardingRememberEntitiesSpec +class DDataClusterShardingRememberEntitiesEnabledMultiJvmNode1 extends DDataClusterShardingRememberEntitiesSpec(true) +class DDataClusterShardingRememberEntitiesEnabledMultiJvmNode2 extends DDataClusterShardingRememberEntitiesSpec(true) +class DDataClusterShardingRememberEntitiesEnabledMultiJvmNode3 extends DDataClusterShardingRememberEntitiesSpec(true) + +class DDataClusterShardingRememberEntitiesDefaultMultiJvmNode1 extends DDataClusterShardingRememberEntitiesSpec(false) +class DDataClusterShardingRememberEntitiesDefaultMultiJvmNode2 extends DDataClusterShardingRememberEntitiesSpec(false) +class DDataClusterShardingRememberEntitiesDefaultMultiJvmNode3 extends DDataClusterShardingRememberEntitiesSpec(false) abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememberEntitiesSpecConfig) extends MultiNodeSpec(config) @@ -111,7 +134,9 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb import ClusterShardingRememberEntitiesSpec._ import config._ - override def initialParticipants = roles.size + override def initialParticipants: Int = roles.size + + val dataType = "Entity" val storageLocations = List( new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) @@ -128,25 +153,45 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb def join(from: RoleName, to: RoleName): Unit = { runOn(from) { Cluster(system).join(node(to).address) + awaitAssert { + Cluster(system).state.isMemberUp(node(from).address) + } } enterBarrier(from.name + "-joined") } val cluster = Cluster(system) - def startSharding(sys: ActorSystem = system, probe: ActorRef = testActor): Unit = { + def startSharding(sys: ActorSystem, probe: ActorRef): ActorRef = { ClusterSharding(sys).start( - typeName = "Entity", + typeName = dataType, entityProps = ClusterShardingRememberEntitiesSpec.props(probe), - settings = ClusterShardingSettings(system).withRememberEntities(true), + settings = ClusterShardingSettings(sys).withRememberEntities(rememberEntities), extractEntityId = extractEntityId, extractShardId = extractShardId) } - lazy val region = ClusterSharding(system).shardRegion("Entity") + lazy val region = ClusterSharding(system).shardRegion(dataType) def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData + def expectEntityRestarted(sys: ActorSystem, event: Int, probe: TestProbe, entityProbe: TestProbe): Started = { + if (!rememberEntities) { + probe.send(ClusterSharding(sys).shardRegion(dataType), event) + probe.expectMsg(1) + } + + entityProbe.expectMsgType[Started](30.seconds) + } + + def setStoreIfNotDdata(sys: ActorSystem): Unit = + if (!isDdataMode) { + val probe = TestProbe()(sys) + sys.actorSelection(node(first) / "user" / "store").tell(Identify(None), probe.ref) + val sharedStore = probe.expectMsgType[ActorIdentity](20.seconds).ref.get + SharedLeveldbJournal.setStore(sharedStore, sys) + } + s"Cluster sharding with remember entities ($mode)" must { if (!isDdataMode) { @@ -156,12 +201,10 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb runOn(first) { system.actorOf(Props[SharedLeveldbStore], "store") } - enterBarrier("peristence-started") + enterBarrier("persistence-started") runOn(first, second, third) { - system.actorSelection(node(first) / "user" / "store") ! Identify(None) - val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get - SharedLeveldbJournal.setStore(sharedStore, system) + setStoreIfNotDdata(system) } enterBarrier("after-1") @@ -169,17 +212,20 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb } "start remembered entities when coordinator fail over" in within(30.seconds) { + val entityProbe = TestProbe() + val probe = TestProbe() join(second, second) runOn(second) { - startSharding() - region ! 1 - expectMsgType[Started] + startSharding(system, entityProbe.ref) + probe.send(region, 1) + probe.expectMsg(1) + entityProbe.expectMsgType[Started] } enterBarrier("second-started") join(third, second) runOn(third) { - startSharding() + startSharding(system, entityProbe.ref) } runOn(second, third) { within(remaining) { @@ -203,7 +249,7 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb enterBarrier("crash-second") runOn(third) { - expectMsgType[Started](remaining) + expectEntityRestarted(system, 1, probe, entityProbe) } enterBarrier("after-2") @@ -220,17 +266,16 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb // no nodes left of the original cluster, start a new cluster val sys2 = ActorSystem(system.name, system.settings.config) + val entityProbe2 = TestProbe()(sys2) val probe2 = TestProbe()(sys2) - if (!isDdataMode) { - sys2.actorSelection(node(first) / "user" / "store").tell(Identify(None), probe2.ref) - val sharedStore = probe2.expectMsgType[ActorIdentity](10.seconds).ref.get - SharedLeveldbJournal.setStore(sharedStore, sys2) - } + setStoreIfNotDdata(sys2) Cluster(sys2).join(Cluster(sys2).selfAddress) - startSharding(sys2, probe2.ref) - probe2.expectMsgType[Started](20.seconds) + + startSharding(sys2, entityProbe2.ref) + + expectEntityRestarted(sys2, 1, probe2, entityProbe2) shutdown(sys2) }