diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala index 1e83913a40..56ba11ebc8 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala @@ -68,7 +68,6 @@ abstract class ClusterShardingRememberEntitiesSpecConfig(val mode: String, val r commonConfig( modeConfig .withFallback(ConfigFactory.parseString(s""" - akka.loglevel = DEBUG akka.actor.provider = "cluster" akka.cluster.auto-down-unreachable-after = 0s akka.remote.log-remote-lifecycle-events = off diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingConfig.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingConfig.scala new file mode 100644 index 0000000000..4f2aa7e9c2 --- /dev/null +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingConfig.scala @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.cluster.sharding + +import akka.cluster.MultiNodeClusterSpec +import akka.persistence.journal.leveldb.SharedLeveldbJournal +import akka.remote.testkit.MultiNodeConfig +import akka.testkit.AkkaSpec +import com.typesafe.config.{ Config, ConfigFactory } + +/** + * A MultiNodeConfig for ClusterSharding. Implement the roles, etc. and create with the following: + * + * @param mode the state store mode + * @param rememberEntities defaults to off + * @param overrides additional config + * @param loglevel defaults to INFO + */ +abstract class MultiNodeClusterShardingConfig( + val mode: String = ClusterShardingSettings.StateStoreModeDData, + val rememberEntities: Boolean = false, + overrides: Config = ConfigFactory.empty, + loglevel: String = "INFO") + extends MultiNodeConfig { + + val targetDir = s"target/ClusterSharding${AkkaSpec.getCallerName(getClass)}Spec-$mode-remember-$rememberEntities" + + val modeConfig = + if (mode == ClusterShardingSettings.StateStoreModeDData) ConfigFactory.empty + else ConfigFactory.parseString(s""" + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" + akka.persistence.journal.leveldb-shared.timeout = 5s + akka.persistence.journal.leveldb-shared.store.native = off + akka.persistence.journal.leveldb-shared.store.dir = "$targetDir/journal" + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "$targetDir/snapshots" + """) + + commonConfig( + overrides + .withFallback(modeConfig) + .withFallback(ConfigFactory.parseString(s""" + akka.loglevel = $loglevel + akka.actor.provider = "cluster" + akka.cluster.auto-down-unreachable-after = 0s + akka.remote.log-remote-lifecycle-events = off + akka.cluster.sharding.state-store-mode = "$mode" + akka.cluster.sharding.distributed-data.durable.lmdb { + dir = $targetDir/sharding-ddata + map-size = 10 MiB + } + """)) + .withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest) + .withFallback(MultiNodeClusterSpec.clusterConfig)) + +} diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala new file mode 100644 index 0000000000..39975c6e18 --- /dev/null +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala @@ -0,0 +1,134 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.cluster.sharding + +import java.io.File + +import scala.concurrent.duration._ + +import akka.actor.{ Actor, ActorIdentity, ActorRef, ActorSystem, Identify, PoisonPill, Props } +import akka.cluster.{ Cluster, MultiNodeClusterSpec } +import akka.persistence.Persistence +import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore } +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeSpec +import akka.testkit.TestProbe +import akka.util.ccompat.ccompatUsedUntil213 +import org.apache.commons.io.FileUtils + +@ccompatUsedUntil213 +object MultiNodeClusterShardingSpec { + + final case class EntityStarted(ref: ActorRef) + + def props(probe: ActorRef): Props = Props(new EntityActor(probe)) + + class EntityActor(probe: ActorRef) extends Actor { + probe ! EntityStarted(self) + + def receive: Receive = { + case m => sender() ! m + } + } + + val defaultExtractEntityId: ShardRegion.ExtractEntityId = { + case id: Int => (id.toString, id) + } + + val defaultExtractShardId: ShardRegion.ExtractShardId = msg => + msg match { + case id: Int => id.toString + case ShardRegion.StartEntity(id) => id + } + +} + +abstract class MultiNodeClusterShardingSpec(val config: MultiNodeClusterShardingConfig) + extends MultiNodeSpec(config) + with MultiNodeClusterSpec { + + import MultiNodeClusterShardingSpec._ + import config._ + + override def initialParticipants: Int = roles.size + + protected val storageLocations = List( + new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) + + override protected def atStartup(): Unit = { + storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir)) + enterBarrier("startup") + super.atStartup() + } + + override protected def afterTermination(): Unit = { + storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir)) + super.afterTermination() + } + + protected def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + Cluster(system).join(node(to).address) + awaitAssert { + Cluster(system).state.isMemberUp(node(from).address) + } + } + enterBarrier(from.name + "-joined") + } + + protected def startSharding( + sys: ActorSystem, + entityProps: Props, + dataType: String, + extractEntityId: ShardRegion.ExtractEntityId = defaultExtractEntityId, + extractShardId: ShardRegion.ExtractShardId = defaultExtractShardId, + handOffStopMessage: Any = PoisonPill): ActorRef = { + + ClusterSharding(sys).start( + typeName = dataType, + entityProps = entityProps, + settings = ClusterShardingSettings(sys).withRememberEntities(rememberEntities), + extractEntityId = extractEntityId, + extractShardId = extractShardId, + ClusterSharding(sys).defaultShardAllocationStrategy(ClusterShardingSettings(sys)), + handOffStopMessage) + } + + protected def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData + + private def setStoreIfNotDdataMode(sys: ActorSystem, storeOn: RoleName): Unit = + if (!isDdataMode) { + val probe = TestProbe()(sys) + sys.actorSelection(node(storeOn) / "user" / "store").tell(Identify(None), probe.ref) + val sharedStore = probe.expectMsgType[ActorIdentity](20.seconds).ref.get + SharedLeveldbJournal.setStore(sharedStore, sys) + } + + /** + * {{{ + * startPersistence(startOn = first, setStoreOn = Seq(first, second, third)) + * }}} + * + * @param startOn the node to start the `SharedLeveldbStore` store on + * @param setStoreOn the nodes to `SharedLeveldbJournal.setStore` on + */ + protected def startPersistenceIfNotDdataMode(startOn: RoleName, setStoreOn: Seq[RoleName]): Unit = + if (!isDdataMode) { + + Persistence(system) + runOn(startOn) { + system.actorOf(Props[SharedLeveldbStore], "store") + } + enterBarrier("persistence-started") + + runOn(setStoreOn: _*) { + setStoreIfNotDdataMode(system, startOn) + } + + enterBarrier(s"after-${startOn.name}") + + } + +}