diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala index 4054eb6190..851b7aa460 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala @@ -4,39 +4,18 @@ package akka.cluster.sharding -import scala.collection.immutable -import scala.concurrent.duration._ -import com.typesafe.config.ConfigFactory import akka.actor._ -import akka.cluster.{ Cluster, MultiNodeClusterSpec } -import akka.persistence.Persistence -import akka.persistence.journal.leveldb.SharedLeveldbJournal -import akka.persistence.journal.leveldb.SharedLeveldbStore -import akka.remote.testconductor.RoleName -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.remote.testkit.STMultiNodeSpec -import akka.testkit._ import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy - -import scala.concurrent.Future -import akka.util.Timeout import akka.pattern.ask +import akka.remote.testconductor.RoleName +import akka.testkit._ +import akka.util.Timeout + +import scala.collection.immutable +import scala.concurrent.Future +import scala.concurrent.duration._ object ClusterShardingCustomShardAllocationSpec { - class Entity extends Actor { - def receive = { - case id: Int => sender() ! id - } - } - - val extractEntityId: ShardRegion.ExtractEntityId = { - case id: Int => (id.toString, id) - } - - val extractShardId: ShardRegion.ExtractShardId = { - case id: Int => id.toString - } case object AllocateReq case class UseRegion(region: ActorRef) @@ -81,37 +60,23 @@ object ClusterShardingCustomShardAllocationSpec { } -abstract class ClusterShardingCustomShardAllocationSpecConfig(val mode: String) extends MultiNodeConfig { +abstract class ClusterShardingCustomShardAllocationSpecConfig(mode: String) + extends MultiNodeClusterShardingConfig( + mode, + additionalConfig = s""" + akka.cluster.sharding.rebalance-interval = 1 s + akka.persistence.journal.leveldb-shared.store.native = off + """) { + val first = role("first") val second = role("second") - commonConfig( - ConfigFactory - .parseString(s""" - akka.actor.provider = "cluster" - akka.remote.log-remote-lifecycle-events = off - akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" - akka.persistence.journal.leveldb-shared { - timeout = 5s - store { - native = off - dir = "target/ClusterShardingCustomShardAllocationSpec/journal" - } - } - akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/ClusterShardingCustomShardAllocationSpec/snapshots" - akka.cluster.sharding.state-store-mode = "$mode" - akka.cluster.sharding.rebalance-interval = 1 s - #akka.cluster.sharding.retry-interval = 5 s - """) - .withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest) - .withFallback(MultiNodeClusterSpec.clusterConfig)) } object PersistentClusterShardingCustomShardAllocationSpecConfig - extends ClusterShardingCustomShardAllocationSpecConfig("persistence") + extends ClusterShardingCustomShardAllocationSpecConfig(ClusterShardingSettings.StateStoreModePersistence) object DDataClusterShardingCustomShardAllocationSpecConfig - extends ClusterShardingCustomShardAllocationSpecConfig("ddata") + extends ClusterShardingCustomShardAllocationSpecConfig(ClusterShardingSettings.StateStoreModeDData) class PersistentClusterShardingCustomShardAllocationSpec extends ClusterShardingCustomShardAllocationSpec(PersistentClusterShardingCustomShardAllocationSpecConfig) @@ -126,62 +91,35 @@ class PersistentClusterShardingCustomShardAllocationMultiJvmNode2 class DDataClusterShardingCustomShardAllocationMultiJvmNode1 extends DDataClusterShardingCustomShardAllocationSpec class DDataClusterShardingCustomShardAllocationMultiJvmNode2 extends DDataClusterShardingCustomShardAllocationSpec -abstract class ClusterShardingCustomShardAllocationSpec(config: ClusterShardingCustomShardAllocationSpecConfig) - extends MultiNodeSpec(config) - with STMultiNodeSpec +abstract class ClusterShardingCustomShardAllocationSpec(multiNodeConfig: ClusterShardingCustomShardAllocationSpecConfig) + extends MultiNodeClusterShardingSpec(multiNodeConfig) with ImplicitSender { - import ClusterShardingCustomShardAllocationSpec._ - import config._ - override def initialParticipants = roles.size + import ClusterShardingCustomShardAllocationSpec._ + import multiNodeConfig._ def join(from: RoleName, to: RoleName): Unit = { - runOn(from) { - Cluster(system).join(node(to).address) - startSharding() - } - enterBarrier(from.name + "-joined") - } - - def startSharding(): Unit = { - ClusterSharding(system).start( - typeName = "Entity", - entityProps = Props[Entity], - settings = ClusterShardingSettings(system), - extractEntityId = extractEntityId, - extractShardId = extractShardId, - allocationStrategy = TestAllocationStrategy(allocator), - handOffStopMessage = PoisonPill) + join( + from, + to, + startSharding( + system, + typeName = "Entity", + entityProps = TestActors.echoActorProps, + extractEntityId = MultiNodeClusterShardingSpec.intExtractEntityId, + extractShardId = MultiNodeClusterShardingSpec.intExtractShardId, + allocationStrategy = TestAllocationStrategy(allocator))) } lazy val region = ClusterSharding(system).shardRegion("Entity") lazy val allocator = system.actorOf(Props[Allocator], "allocator") - def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData - s"Cluster sharding ($mode) with custom allocation strategy" must { - if (!isDdataMode) { - "setup shared journal" in { - // start the Persistence extension - Persistence(system) - runOn(first) { - system.actorOf(Props[SharedLeveldbStore], "store") - } - enterBarrier("persistence-started") - - runOn(first, second) { - system.actorSelection(node(first) / "user" / "store") ! Identify(None) - val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get - SharedLeveldbJournal.setStore(sharedStore, system) - } - - enterBarrier("after-1") - } - } - "use specified region" in within(30.seconds) { + startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(first, second)) + join(first, first) runOn(first) { diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala index e86debe123..7b0e391541 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala @@ -4,27 +4,16 @@ package akka.cluster.sharding -import java.io.File - -import akka.cluster.sharding.ShardRegion.Passivate -import scala.concurrent.duration._ - -import org.apache.commons.io.FileUtils -import com.typesafe.config.ConfigFactory import akka.actor._ -import akka.cluster.{ Cluster, MemberStatus, MultiNodeClusterSpec } -import akka.persistence.Persistence -import akka.persistence.journal.leveldb.SharedLeveldbJournal -import akka.persistence.journal.leveldb.SharedLeveldbStore +import akka.cluster.sharding.ShardRegion.Passivate import akka.remote.testconductor.RoleName -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.remote.testkit.STMultiNodeSpec import akka.remote.transport.ThrottlerTransportAdapter.Direction import akka.serialization.jackson.CborSerializable import akka.testkit._ import akka.util.ccompat._ +import scala.concurrent.duration._ + @ccompatUsedUntil213 object ClusterShardingFailureSpec { case class Get(id: String) extends CborSerializable @@ -49,56 +38,36 @@ object ClusterShardingFailureSpec { case Get(id) => id.charAt(0).toString case Add(id, _) => id.charAt(0).toString } - } -abstract class ClusterShardingFailureSpecConfig(val mode: String) extends MultiNodeConfig { +abstract class ClusterShardingFailureSpecConfig(override val mode: String) + extends MultiNodeClusterShardingConfig( + mode, + additionalConfig = s""" + akka.cluster.roles = ["backend"] + akka.cluster.sharding { + coordinator-failure-backoff = 3s + shard-failure-backoff = 3s + } + akka.persistence.journal.leveldb-shared.store.native = off + # using Java serialization for these messages because test is sending them + # to other nodes, which isn't normal usage. + akka.actor.serialization-bindings { + "${classOf[ShardRegion.Passivate].getName}" = java-test + } + """) { + val controller = role("controller") val first = role("first") val second = role("second") - commonConfig( - ConfigFactory - .parseString(s""" - akka.loglevel = INFO - akka.actor.provider = "cluster" - akka.remote.classic.log-remote-lifecycle-events = off - akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning - akka.cluster.testkit.auto-down-unreachable-after = 0s - akka.cluster.roles = ["backend"] - akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" - akka.persistence.journal.leveldb-shared { - timeout = 5s - store { - native = off - dir = "target/ClusterShardingFailureSpec/journal" - } - } - akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/ClusterShardingFailureSpec/snapshots" - akka.cluster.sharding { - coordinator-failure-backoff = 3s - shard-failure-backoff = 3s - state-store-mode = "$mode" - } - akka.cluster.sharding.distributed-data.durable.lmdb { - dir = target/ClusterShardingFailureSpec/sharding-ddata - map-size = 10 MiB - } - # using Java serialization for these messages because test is sending them - # to other nodes, which isn't normal usage. - akka.actor.serialization-bindings { - "${classOf[ShardRegion.Passivate].getName}" = java-test - } - """) - .withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest) - .withFallback(MultiNodeClusterSpec.clusterConfig)) - testTransport(on = true) } -object PersistentClusterShardingFailureSpecConfig extends ClusterShardingFailureSpecConfig("persistence") -object DDataClusterShardingFailureSpecConfig extends ClusterShardingFailureSpecConfig("ddata") +object PersistentClusterShardingFailureSpecConfig + extends ClusterShardingFailureSpecConfig(ClusterShardingSettings.StateStoreModePersistence) +object DDataClusterShardingFailureSpecConfig + extends ClusterShardingFailureSpecConfig(ClusterShardingSettings.StateStoreModeDData) class PersistentClusterShardingFailureSpec extends ClusterShardingFailureSpec(PersistentClusterShardingFailureSpecConfig) @@ -112,79 +81,31 @@ class DDataClusterShardingFailureMultiJvmNode1 extends DDataClusterShardingFailu class DDataClusterShardingFailureMultiJvmNode2 extends DDataClusterShardingFailureSpec class DDataClusterShardingFailureMultiJvmNode3 extends DDataClusterShardingFailureSpec -abstract class ClusterShardingFailureSpec(config: ClusterShardingFailureSpecConfig) - extends MultiNodeSpec(config) - with STMultiNodeSpec +abstract class ClusterShardingFailureSpec(multiNodeConfig: ClusterShardingFailureSpecConfig) + extends MultiNodeClusterShardingSpec(multiNodeConfig) with ImplicitSender { import ClusterShardingFailureSpec._ - import config._ - - override def initialParticipants = roles.size - - val storageLocations = List( - new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) - - override protected def atStartup(): Unit = { - storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir)) - enterBarrier("startup") - } - - override protected def afterTermination(): Unit = { - storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir)) - } - - val cluster = Cluster(system) + import multiNodeConfig._ def join(from: RoleName, to: RoleName): Unit = { - runOn(from) { - cluster.join(node(to).address) - startSharding() - - within(remaining) { - awaitAssert { - cluster.state.members.unsorted.map(_.uniqueAddress) should contain(cluster.selfUniqueAddress) - cluster.state.members.unsorted.map(_.status) should ===(Set(MemberStatus.Up)) - } - } - } - enterBarrier(from.name + "-joined") - } - - def startSharding(): Unit = { - ClusterSharding(system).start( - typeName = "Entity", - entityProps = Props[Entity], - settings = ClusterShardingSettings(system).withRememberEntities(true), - extractEntityId = extractEntityId, - extractShardId = extractShardId) + join( + from, + to, + startSharding( + system, + typeName = "Entity", + entityProps = Props[Entity], + extractEntityId = extractEntityId, + extractShardId = extractShardId)) } lazy val region = ClusterSharding(system).shardRegion("Entity") - def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData - s"Cluster sharding ($mode) with flaky journal/network" must { - if (!isDdataMode) { - "setup shared journal" in { - // start the Persistence extension - Persistence(system) - runOn(controller) { - system.actorOf(Props[SharedLeveldbStore], "store") - } - enterBarrier("persistence-started") - - runOn(first, second) { - system.actorSelection(node(controller) / "user" / "store") ! Identify(None) - val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get - SharedLeveldbJournal.setStore(sharedStore, system) - } - - enterBarrier("after-1") - } - } - "join cluster" in within(20.seconds) { + startPersistenceIfNotDdataMode(startOn = controller, setStoreOn = Seq(first, second)) + join(first, first) join(second, first) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStateSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStateSpec.scala index 6a3ea6a047..5701d8ee51 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStateSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStateSpec.scala @@ -4,64 +4,40 @@ package akka.cluster.sharding -import akka.actor._ -import akka.cluster.{ Cluster, MultiNodeClusterSpec } -import akka.cluster.ClusterEvent.CurrentClusterState -import akka.remote.testconductor.RoleName -import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } -import akka.testkit.TestProbe -import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ -import akka.serialization.jackson.CborSerializable +import akka.actor._ +import akka.cluster.Cluster +import akka.cluster.ClusterEvent.CurrentClusterState +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory object ClusterShardingGetStateSpec { - case object Stop extends CborSerializable - case class Ping(id: Long) extends CborSerializable - case object Pong extends CborSerializable - - class ShardedActor extends Actor with ActorLogging { - log.info(self.path.toString) - def receive = { - case Stop => context.stop(self) - case _: Ping => sender() ! Pong - } - } + import MultiNodeClusterShardingSpec.PingPongActor val extractEntityId: ShardRegion.ExtractEntityId = { - case msg @ Ping(id) => (id.toString, msg) + case msg @ PingPongActor.Ping(id) => (id.toString, msg) } val numberOfShards = 2 val extractShardId: ShardRegion.ExtractShardId = { - case Ping(id) => (id % numberOfShards).toString + case PingPongActor.Ping(id) => (id % numberOfShards).toString } val shardTypeName = "Ping" } -object ClusterShardingGetStateSpecConfig extends MultiNodeConfig { - val controller = role("controller") - val first = role("first") - val second = role("second") - - commonConfig(ConfigFactory.parseString(s""" - akka.loglevel = INFO - akka.actor.provider = "cluster" - akka.remote.log-remote-lifecycle-events = off - akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning - akka.cluster.testkit.auto-down-unreachable-after = 0s +object ClusterShardingGetStateSpecConfig extends MultiNodeClusterShardingConfig(additionalConfig = s""" akka.cluster.sharding { coordinator-failure-backoff = 3s shard-failure-backoff = 3s - state-store-mode = "ddata" } - akka.cluster.sharding.distributed-data.durable.lmdb { - dir = target/ClusterShardingGetStateSpec/sharding-ddata - map-size = 10 MiB - } - """).withFallback(MultiNodeClusterSpec.clusterConfig)) + """) { + + val controller = role("controller") + val first = role("first") + val second = role("second") nodeConfig(first, second)(ConfigFactory.parseString("""akka.cluster.roles=["shard"]""")) @@ -71,45 +47,18 @@ class ClusterShardingGetStateSpecMultiJvmNode1 extends ClusterShardingGetStateSp class ClusterShardingGetStateSpecMultiJvmNode2 extends ClusterShardingGetStateSpec class ClusterShardingGetStateSpecMultiJvmNode3 extends ClusterShardingGetStateSpec -abstract class ClusterShardingGetStateSpec - extends MultiNodeSpec(ClusterShardingGetStateSpecConfig) - with STMultiNodeSpec { +abstract class ClusterShardingGetStateSpec extends MultiNodeClusterShardingSpec(ClusterShardingGetStateSpecConfig) { import ClusterShardingGetStateSpec._ import ClusterShardingGetStateSpecConfig._ - - def initialParticipants = roles.size - - def startShard(): ActorRef = { - ClusterSharding(system).start( - typeName = shardTypeName, - entityProps = Props(new ShardedActor), - settings = ClusterShardingSettings(system).withRole("shard"), - extractEntityId = extractEntityId, - extractShardId = extractShardId) - } - - def startProxy(): ActorRef = { - ClusterSharding(system).startProxy( - typeName = shardTypeName, - role = Some("shard"), - extractEntityId = extractEntityId, - extractShardId = extractShardId) - } - - def join(from: RoleName): Unit = { - runOn(from) { - Cluster(system).join(node(controller).address) - } - enterBarrier(from.name + "-joined") - } + import MultiNodeClusterShardingSpec.PingPongActor "Inspecting cluster sharding state" must { "join cluster" in { - join(controller) - join(first) - join(second) + join(controller, controller) + join(first, controller) + join(second, controller) // make sure all nodes has joined awaitAssert { @@ -118,10 +67,22 @@ abstract class ClusterShardingGetStateSpec } runOn(controller) { - startProxy() + startProxy( + system, + typeName = shardTypeName, + role = Some("shard"), + extractEntityId = extractEntityId, + extractShardId = extractShardId) } + runOn(first, second) { - startShard() + startSharding( + system, + typeName = shardTypeName, + entityProps = Props(new PingPongActor), + settings = settings.withRole("shard"), + extractEntityId = extractEntityId, + extractShardId = extractShardId) } enterBarrier("sharding started") @@ -147,9 +108,9 @@ abstract class ClusterShardingGetStateSpec awaitAssert { val pingProbe = TestProbe() // trigger starting of 4 entities - (1 to 4).foreach(n => region.tell(Ping(n), pingProbe.ref)) + (1 to 4).foreach(n => region.tell(PingPongActor.Ping(n), pingProbe.ref)) pingProbe.receiveWhile(messages = 4) { - case Pong => () + case PingPongActor.Pong => () } } } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala index 57e89e7eaa..223453f3bd 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala @@ -7,68 +7,37 @@ package akka.cluster.sharding import scala.concurrent.duration._ import akka.actor._ -import akka.cluster.Cluster -import akka.cluster.MemberStatus -import akka.cluster.MultiNodeClusterSpec -import akka.remote.testconductor.RoleName -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.remote.testkit.STMultiNodeSpec -import akka.serialization.jackson.CborSerializable -import akka.testkit.TestDuration -import akka.testkit.TestProbe +import akka.cluster.{ Cluster, MemberStatus } +import akka.testkit.{ TestDuration, TestProbe } import com.typesafe.config.ConfigFactory object ClusterShardingGetStatsSpec { - case object Stop extends CborSerializable - case class Ping(id: Long) extends CborSerializable - case object Pong extends CborSerializable + import MultiNodeClusterShardingSpec.PingPongActor - class ShardedActor extends Actor with ActorLogging { - log.info(s"entity started {}", self.path) - def receive = { - case Stop => context.stop(self) - case _: Ping => sender() ! Pong - } - } - - val extractEntityId: ShardRegion.ExtractEntityId = { - case msg @ Ping(id) => (id.toString, msg) - } + val shardTypeName = "Ping" val numberOfShards = 3 - val extractShardId: ShardRegion.ExtractShardId = { - case Ping(id) => (id % numberOfShards).toString + val extractEntityId: ShardRegion.ExtractEntityId = { + case msg @ PingPongActor.Ping(id) => (id.toString, msg) + } + val extractShardId: ShardRegion.ExtractShardId = { + case PingPongActor.Ping(id) => (id % numberOfShards).toString } - - val shardTypeName = "Ping" } -object ClusterShardingGetStatsSpecConfig extends MultiNodeConfig { +object ClusterShardingGetStatsSpecConfig + extends MultiNodeClusterShardingConfig(additionalConfig = """ + akka.log-dead-letters-during-shutdown = off + akka.cluster.sharding.updating-state-timeout = 2s + akka.cluster.sharding.waiting-for-state-timeout = 2s + """) { + val controller = role("controller") val first = role("first") val second = role("second") val third = role("third") - commonConfig(ConfigFactory.parseString(""" - akka.loglevel = INFO - akka.actor.provider = "cluster" - akka.remote.classic.log-remote-lifecycle-events = off - akka.log-dead-letters-during-shutdown = off - akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning - akka.cluster.testkit.auto-down-unreachable-after = 0s - akka.cluster.sharding { - state-store-mode = "ddata" - updating-state-timeout = 2s - waiting-for-state-timeout = 2s - } - akka.cluster.sharding.distributed-data.durable.lmdb { - dir = target/ClusterShardingGetStatsSpec/sharding-ddata - map-size = 10 MiB - } - """).withFallback(MultiNodeClusterSpec.clusterConfig)) - nodeConfig(first, second, third)(ConfigFactory.parseString("""akka.cluster.roles=["shard"]""")) } @@ -78,48 +47,30 @@ class ClusterShardingGetStatsSpecMultiJvmNode2 extends ClusterShardingGetStatsSp class ClusterShardingGetStatsSpecMultiJvmNode3 extends ClusterShardingGetStatsSpec class ClusterShardingGetStatsSpecMultiJvmNode4 extends ClusterShardingGetStatsSpec -abstract class ClusterShardingGetStatsSpec - extends MultiNodeSpec(ClusterShardingGetStatsSpecConfig) - with STMultiNodeSpec { +abstract class ClusterShardingGetStatsSpec extends MultiNodeClusterShardingSpec(ClusterShardingGetStatsSpecConfig) { import ClusterShardingGetStatsSpec._ import ClusterShardingGetStatsSpecConfig._ - - def initialParticipants = roles.size + import MultiNodeClusterShardingSpec.PingPongActor def startShard(): ActorRef = { - ClusterSharding(system).start( + startSharding( + system, typeName = shardTypeName, - entityProps = Props(new ShardedActor), - settings = ClusterShardingSettings(system).withRole("shard"), + entityProps = Props(new PingPongActor), + settings = settings.withRole("shard"), extractEntityId = extractEntityId, extractShardId = extractShardId) } - def startProxy(): ActorRef = { - ClusterSharding(system).startProxy( - typeName = shardTypeName, - role = Some("shard"), - extractEntityId = extractEntityId, - extractShardId = extractShardId) - } - - def join(from: RoleName): Unit = { - runOn(from) { - Cluster(system).join(node(controller).address) - } - enterBarrier(from.name + "-joined") - } - lazy val region = ClusterSharding(system).shardRegion(shardTypeName) "Inspecting cluster sharding state" must { "join cluster" in { - join(controller) - join(first) - join(second) - join(third) + Seq(controller, first, second, third).foreach { node => + join(from = node, to = controller) + } // make sure all nodes are up within(10.seconds) { @@ -129,7 +80,12 @@ abstract class ClusterShardingGetStatsSpec } runOn(controller) { - startProxy() + startProxy( + system, + typeName = shardTypeName, + role = Some("shard"), + extractEntityId = extractEntityId, + extractShardId = extractShardId) } runOn(first, second, third) { startShard() @@ -162,9 +118,9 @@ abstract class ClusterShardingGetStatsSpec val pingProbe = TestProbe() // trigger starting of 2 entities on first and second node // but leave third node without entities - List(1, 2, 4, 6).foreach(n => region.tell(Ping(n), pingProbe.ref)) + List(1, 2, 4, 6).foreach(n => region.tell(PingPongActor.Ping(n), pingProbe.ref)) pingProbe.receiveWhile(messages = 4) { - case Pong => () + case PingPongActor.Pong => () } } } @@ -209,9 +165,9 @@ abstract class ClusterShardingGetStatsSpec awaitAssert { val pingProbe = TestProbe() // make sure we have the 4 entities still alive across the fewer nodes - List(1, 2, 4, 6).foreach(n => region.tell(Ping(n), pingProbe.ref)) + List(1, 2, 4, 6).foreach(n => region.tell(PingPongActor.Ping(n), pingProbe.ref)) pingProbe.receiveWhile(messages = 4) { - case Pong => () + case PingPongActor.Pong => () } } } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala index 916ef65903..1a67f3e187 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala @@ -4,77 +4,25 @@ package akka.cluster.sharding -import scala.concurrent.duration._ -import java.io.File - import akka.actor._ -import akka.cluster.{ Cluster, MultiNodeClusterSpec } import akka.cluster.sharding.ShardRegion.GracefulShutdown -import akka.persistence.Persistence -import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore } import akka.remote.testconductor.RoleName -import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } import akka.testkit._ -import com.typesafe.config.ConfigFactory -import org.apache.commons.io.FileUtils import scala.concurrent.duration._ -object ClusterShardingGracefulShutdownSpec { - case object StopEntity - - class Entity extends Actor { - def receive = { - case id: Int => sender() ! id - case StopEntity => - context.stop(self) - } - } - - val extractEntityId: ShardRegion.ExtractEntityId = { - case id: Int => (id.toString, id) - } - - val extractShardId: ShardRegion.ExtractShardId = msg => - msg match { - case id: Int => id.toString - } - -} - -abstract class ClusterShardingGracefulShutdownSpecConfig(val mode: String) extends MultiNodeConfig { +abstract class ClusterShardingGracefulShutdownSpecConfig(mode: String) + extends MultiNodeClusterShardingConfig( + mode, + additionalConfig = "akka.persistence.journal.leveldb-shared.store.native = off") { val first = role("first") val second = role("second") - - commonConfig( - ConfigFactory - .parseString(s""" - akka.loglevel = INFO - akka.actor.provider = "cluster" - akka.remote.log-remote-lifecycle-events = off - akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" - akka.persistence.journal.leveldb-shared { - timeout = 5s - store { - native = off - dir = "target/ClusterShardingGracefulShutdownSpec/journal" - } - } - akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/ClusterShardingGracefulShutdownSpec/snapshots" - akka.cluster.sharding.state-store-mode = "$mode" - akka.cluster.sharding.distributed-data.durable.lmdb { - dir = target/ClusterShardingGracefulShutdownSpec/sharding-ddata - map-size = 10 MiB - } - """) - .withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest) - .withFallback(MultiNodeClusterSpec.clusterConfig)) } object PersistentClusterShardingGracefulShutdownSpecConfig - extends ClusterShardingGracefulShutdownSpecConfig("persistence") -object DDataClusterShardingGracefulShutdownSpecConfig extends ClusterShardingGracefulShutdownSpecConfig("ddata") + extends ClusterShardingGracefulShutdownSpecConfig(ClusterShardingSettings.StateStoreModePersistence) +object DDataClusterShardingGracefulShutdownSpecConfig + extends ClusterShardingGracefulShutdownSpecConfig(ClusterShardingSettings.StateStoreModeDData) class PersistentClusterShardingGracefulShutdownSpec extends ClusterShardingGracefulShutdownSpec(PersistentClusterShardingGracefulShutdownSpecConfig) @@ -87,76 +35,43 @@ class PersistentClusterShardingGracefulShutdownMultiJvmNode2 extends PersistentC class DDataClusterShardingGracefulShutdownMultiJvmNode1 extends DDataClusterShardingGracefulShutdownSpec class DDataClusterShardingGracefulShutdownMultiJvmNode2 extends DDataClusterShardingGracefulShutdownSpec -abstract class ClusterShardingGracefulShutdownSpec(config: ClusterShardingGracefulShutdownSpecConfig) - extends MultiNodeSpec(config) - with STMultiNodeSpec +abstract class ClusterShardingGracefulShutdownSpec(multiNodeConfig: ClusterShardingGracefulShutdownSpecConfig) + extends MultiNodeClusterShardingSpec(multiNodeConfig) with ImplicitSender { - import ClusterShardingGracefulShutdownSpec._ - import config._ - override def initialParticipants = roles.size + import MultiNodeClusterShardingSpec.ShardedEntity + import multiNodeConfig._ - val storageLocations = List( - new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) + private val typeName = "Entity" - override protected def atStartup(): Unit = { - storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir)) - enterBarrier("startup") - } - - override protected def afterTermination(): Unit = { - storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir)) - } - - def join(from: RoleName, to: RoleName): Unit = { + def join(from: RoleName, to: RoleName, typeName: String): Unit = { + super.join(from, to) runOn(from) { - Cluster(system).join(node(to).address) - startSharding() + startSharding(typeName) } - enterBarrier(from.name + "-joined") + enterBarrier(s"$from-started") } - def startSharding(): Unit = { - val allocationStrategy = - new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1) - ClusterSharding(system).start( - typeName = "Entity", - entityProps = Props[Entity], - settings = ClusterShardingSettings(system), - extractEntityId = extractEntityId, - extractShardId = extractShardId, - allocationStrategy, - handOffStopMessage = StopEntity) - } + def startSharding(typeName: String): ActorRef = + startSharding( + system, + typeName, + entityProps = Props[ShardedEntity], + extractEntityId = MultiNodeClusterShardingSpec.intExtractEntityId, + extractShardId = MultiNodeClusterShardingSpec.intExtractShardId, + allocationStrategy = + new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1), + handOffStopMessage = ShardedEntity.Stop) - lazy val region = ClusterSharding(system).shardRegion("Entity") - - def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData + lazy val region = ClusterSharding(system).shardRegion(typeName) s"Cluster sharding ($mode)" must { - if (!isDdataMode) { - "setup shared journal" in { - // start the Persistence extension - Persistence(system) - runOn(first) { - system.actorOf(Props[SharedLeveldbStore], "store") - } - enterBarrier("peristence-started") - - runOn(first, second) { - system.actorSelection(node(first) / "user" / "store") ! Identify(None) - val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get - SharedLeveldbJournal.setStore(sharedStore, system) - } - - enterBarrier("after-1") - } - } - "start some shards in both regions" in within(30.seconds) { - join(first, first) - join(second, first) + startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(first, second)) + + join(first, first, typeName) + join(second, first, typeName) awaitAssert { val p = TestProbe() @@ -197,16 +112,7 @@ abstract class ClusterShardingGracefulShutdownSpec(config: ClusterShardingGracef "gracefully shutdown empty region" in within(30.seconds) { runOn(first) { - val allocationStrategy = - new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1) - val regionEmpty = ClusterSharding(system).start( - typeName = "EntityEmpty", - entityProps = Props[Entity], - settings = ClusterShardingSettings(system), - extractEntityId = extractEntityId, - extractShardId = extractShardId, - allocationStrategy, - handOffStopMessage = StopEntity) + val regionEmpty = startSharding(typeName = "EntityEmpty") watch(regionEmpty) regionEmpty ! GracefulShutdown diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingIncorrectSetupSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingIncorrectSetupSpec.scala index 071cc2a98e..57e68e3976 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingIncorrectSetupSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingIncorrectSetupSpec.scala @@ -4,44 +4,23 @@ package akka.cluster.sharding -import akka.cluster.MultiNodeClusterSpec -import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec } import akka.testkit._ -import com.typesafe.config.ConfigFactory -object ClusterShardingIncorrectSetupSpecConfig extends MultiNodeConfig { +object ClusterShardingIncorrectSetupSpecConfig + extends MultiNodeClusterShardingConfig(additionalConfig = "akka.cluster.sharding.waiting-for-state-timeout = 100ms") { + val first = role("first") val second = role("second") - val commonConfig = ConfigFactory.parseString(""" - akka.loglevel = INFO - akka.cluster.sharding { - waiting-for-state-timeout = 100ms - } - """.stripMargin) - - commonConfig(commonConfig.withFallback(MultiNodeClusterSpec.clusterConfig)) } class ClusterShardingIncorrectSetupMultiJvmNode1 extends ClusterShardingIncorrectSetupSpec class ClusterShardingIncorrectSetupMultiJvmNode2 extends ClusterShardingIncorrectSetupSpec -object ClusterShardingIncorrectSetupSpec { - val extractEntityId: ShardRegion.ExtractEntityId = { - case id: Int => (id.toString, id) - } - - val extractShardId: ShardRegion.ExtractShardId = { - case id: Int => id.toString - } -} - abstract class ClusterShardingIncorrectSetupSpec - extends MultiNodeSpec(ClusterShardingIncorrectSetupSpecConfig) - with MultiNodeClusterSpec + extends MultiNodeClusterShardingSpec(ClusterShardingIncorrectSetupSpecConfig) with ImplicitSender { - import ClusterShardingIncorrectSetupSpec._ import ClusterShardingIncorrectSetupSpecConfig._ "Cluster sharding" must { @@ -50,12 +29,7 @@ abstract class ClusterShardingIncorrectSetupSpec enterBarrier("cluster-up") runOn(first) { EventFilter.error(pattern = """Has ClusterSharding been started on all nodes?""").intercept { - ClusterSharding(system).start( - typeName = "Entity", - entityProps = TestActors.echoActorProps, - settings = ClusterShardingSettings(system), - extractEntityId = extractEntityId, - extractShardId = extractShardId) + startSharding(system, typeName = "Entity", entityProps = TestActors.echoActorProps) } } enterBarrier("helpful error message logged") diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala index 8123f22899..b59718a954 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala @@ -4,27 +4,11 @@ package akka.cluster.sharding -import java.io.File - import scala.concurrent.duration._ -import akka.actor.Actor -import akka.actor.ActorIdentity -import akka.actor.ActorRef -import akka.actor.Identify -import akka.actor.Props -import akka.cluster.{ Cluster, MemberStatus, MultiNodeClusterSpec } -import akka.persistence.Persistence -import akka.persistence.journal.leveldb.SharedLeveldbJournal -import akka.persistence.journal.leveldb.SharedLeveldbStore -import akka.remote.testconductor.RoleName -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.remote.testkit.STMultiNodeSpec +import akka.actor.{ Actor, ActorRef, Props } import akka.serialization.jackson.CborSerializable import akka.testkit._ -import com.typesafe.config.ConfigFactory -import org.apache.commons.io.FileUtils object ClusterShardingLeavingSpec { case class Ping(id: String) extends CborSerializable @@ -55,42 +39,18 @@ object ClusterShardingLeavingSpec { } } -abstract class ClusterShardingLeavingSpecConfig(val mode: String) extends MultiNodeConfig { +abstract class ClusterShardingLeavingSpecConfig(mode: String) extends MultiNodeClusterShardingConfig(mode) { val first = role("first") val second = role("second") val third = role("third") val fourth = role("fourth") - commonConfig( - ConfigFactory - .parseString(s""" - akka.loglevel = INFO - akka.actor.provider = "cluster" - akka.remote.classic.log-remote-lifecycle-events = off - akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning - akka.cluster.testkit.auto-down-unreachable-after = 0s - akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" - akka.persistence.journal.leveldb-shared { - timeout = 5s - store { - native = off - dir = "target/ClusterShardingLeavingSpec/journal" - } - } - akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/ClusterShardingLeavingSpec/snapshots" - akka.cluster.sharding.state-store-mode = "$mode" - akka.cluster.sharding.distributed-data.durable.lmdb { - dir = target/ClusterShardingLeavingSpec/sharding-ddata - map-size = 10 MiB - } - """) - .withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest) - .withFallback(MultiNodeClusterSpec.clusterConfig)) } -object PersistentClusterShardingLeavingSpecConfig extends ClusterShardingLeavingSpecConfig("persistence") -object DDataClusterShardingLeavingSpecConfig extends ClusterShardingLeavingSpecConfig("ddata") +object PersistentClusterShardingLeavingSpecConfig + extends ClusterShardingLeavingSpecConfig(ClusterShardingSettings.StateStoreModePersistence) +object DDataClusterShardingLeavingSpecConfig + extends ClusterShardingLeavingSpecConfig(ClusterShardingSettings.StateStoreModeDData) class PersistentClusterShardingLeavingSpec extends ClusterShardingLeavingSpec(PersistentClusterShardingLeavingSpecConfig) @@ -106,79 +66,32 @@ class DDataClusterShardingLeavingMultiJvmNode2 extends DDataClusterShardingLeavi class DDataClusterShardingLeavingMultiJvmNode3 extends DDataClusterShardingLeavingSpec class DDataClusterShardingLeavingMultiJvmNode4 extends DDataClusterShardingLeavingSpec -abstract class ClusterShardingLeavingSpec(config: ClusterShardingLeavingSpecConfig) - extends MultiNodeSpec(config) - with STMultiNodeSpec +abstract class ClusterShardingLeavingSpec(multiNodeConfig: ClusterShardingLeavingSpecConfig) + extends MultiNodeClusterShardingSpec(multiNodeConfig) with ImplicitSender { import ClusterShardingLeavingSpec._ - import config._ - - override def initialParticipants = roles.size - - val storageLocations = List( - new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) - - override protected def atStartup(): Unit = { - storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir)) - enterBarrier("startup") - } - - override protected def afterTermination(): Unit = { - storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir)) - } - - val cluster = Cluster(system) - - def join(from: RoleName, to: RoleName): Unit = { - runOn(from) { - cluster.join(node(to).address) - startSharding() - within(15.seconds) { - awaitAssert(cluster.state.members.exists { m => - m.uniqueAddress == cluster.selfUniqueAddress && m.status == MemberStatus.Up - } should be(true)) - } - } - enterBarrier(from.name + "-joined") - } + import multiNodeConfig._ def startSharding(): Unit = { - ClusterSharding(system).start( + startSharding( + system, typeName = "Entity", entityProps = Props[Entity], - settings = ClusterShardingSettings(system), extractEntityId = extractEntityId, extractShardId = extractShardId) } lazy val region = ClusterSharding(system).shardRegion("Entity") - def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData - s"Cluster sharding ($mode) with leaving member" must { - if (!isDdataMode) { - "setup shared journal" in { - // start the Persistence extension - Persistence(system) - runOn(first) { - system.actorOf(Props[SharedLeveldbStore], "store") - } - enterBarrier("peristence-started") - - system.actorSelection(node(first) / "user" / "store") ! Identify(None) - val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get - SharedLeveldbJournal.setStore(sharedStore, system) - - enterBarrier("after-1") - } - } - "join cluster" in within(20.seconds) { - join(first, first) - join(second, first) - join(third, first) - join(fourth, first) + startPersistenceIfNotDdataMode(startOn = first, setStoreOn = roles) + + join(first, first, onJoinedRunOnFrom = startSharding()) + join(second, first, onJoinedRunOnFrom = startSharding()) + join(third, first, onJoinedRunOnFrom = startSharding()) + join(fourth, first, onJoinedRunOnFrom = startSharding()) enterBarrier("after-2") } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala index 3ad2f1fb20..77b578c022 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala @@ -4,73 +4,33 @@ package akka.cluster.sharding -import java.io.File - import akka.actor._ -import akka.cluster.{ Cluster, MemberStatus, MultiNodeClusterSpec } -import akka.persistence.Persistence -import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore } -import akka.remote.testconductor.RoleName -import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } +import akka.cluster.MemberStatus +import akka.cluster.sharding.ShardRegion.{ ClusterShardingStats, GetClusterShardingStats } import akka.testkit._ -import com.typesafe.config.ConfigFactory -import org.apache.commons.io.FileUtils - -import scala.concurrent.duration._ -import akka.cluster.sharding.ShardRegion.GetClusterShardingStats -import akka.cluster.sharding.ShardRegion.ClusterShardingStats import akka.util.ccompat._ +import scala.concurrent.duration._ + @ccompatUsedUntil213 -object ClusterShardingMinMembersSpec { - case object StopEntity +abstract class ClusterShardingMinMembersSpecConfig(mode: String) + extends MultiNodeClusterShardingConfig( + mode, + additionalConfig = s""" + akka.cluster.sharding.rebalance-interval = 120s #disable rebalance + akka.cluster.min-nr-of-members = 3 + """) { - val extractEntityId: ShardRegion.ExtractEntityId = { - case id: Int => (id.toString, id) - } - - val extractShardId: ShardRegion.ExtractShardId = msg => - msg match { - case id: Int => id.toString - } - -} - -abstract class ClusterShardingMinMembersSpecConfig(val mode: String) extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") - commonConfig( - ConfigFactory - .parseString(s""" - akka.loglevel = INFO - akka.actor.provider = "cluster" - akka.remote.log-remote-lifecycle-events = off - akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" - akka.persistence.journal.leveldb-shared { - timeout = 5s - store { - native = off - dir = "target/ClusterShardingMinMembersSpec/journal" - } - } - akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/ClusterShardingMinMembersSpec/snapshots" - akka.cluster.sharding.state-store-mode = "$mode" - akka.cluster.sharding.rebalance-interval = 120s #disable rebalance - akka.cluster.sharding.distributed-data.durable.lmdb { - dir = target/ClusterShardingMinMembersSpec/sharding-ddata - map-size = 10 MiB - } - akka.cluster.min-nr-of-members = 3 - """) - .withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest) - .withFallback(MultiNodeClusterSpec.clusterConfig)) } -object PersistentClusterShardingMinMembersSpecConfig extends ClusterShardingMinMembersSpecConfig("persistence") -object DDataClusterShardingMinMembersSpecConfig extends ClusterShardingMinMembersSpecConfig("ddata") +object PersistentClusterShardingMinMembersSpecConfig + extends ClusterShardingMinMembersSpecConfig(ClusterShardingSettings.StateStoreModePersistence) +object DDataClusterShardingMinMembersSpecConfig + extends ClusterShardingMinMembersSpecConfig(ClusterShardingSettings.StateStoreModeDData) class PersistentClusterShardingMinMembersSpec extends ClusterShardingMinMembersSpec(PersistentClusterShardingMinMembersSpecConfig) @@ -84,84 +44,35 @@ class DDataClusterShardingMinMembersMultiJvmNode1 extends DDataClusterShardingMi class DDataClusterShardingMinMembersMultiJvmNode2 extends DDataClusterShardingMinMembersSpec class DDataClusterShardingMinMembersMultiJvmNode3 extends DDataClusterShardingMinMembersSpec -abstract class ClusterShardingMinMembersSpec(config: ClusterShardingMinMembersSpecConfig) - extends MultiNodeSpec(config) - with STMultiNodeSpec +abstract class ClusterShardingMinMembersSpec(multiNodeConfig: ClusterShardingMinMembersSpecConfig) + extends MultiNodeClusterShardingSpec(multiNodeConfig) with ImplicitSender { - import ClusterShardingMinMembersSpec._ - import config._ - - override def initialParticipants = roles.size - - val storageLocations = List( - new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) - - override protected def atStartup(): Unit = { - storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir)) - enterBarrier("startup") - } - - override protected def afterTermination(): Unit = { - storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir)) - } - - def join(from: RoleName, to: RoleName): Unit = { - runOn(from) { - Cluster(system).join(node(to).address) - } - enterBarrier(from.name + "-joined") - } - - val cluster = Cluster(system) + import MultiNodeClusterShardingSpec.ShardedEntity + import multiNodeConfig._ def startSharding(): Unit = { - val allocationStrategy = - new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1) - ClusterSharding(system).start( + startSharding( + system, typeName = "Entity", entityProps = TestActors.echoActorProps, - settings = ClusterShardingSettings(system), - extractEntityId = extractEntityId, - extractShardId = extractShardId, - allocationStrategy, - handOffStopMessage = StopEntity) + extractEntityId = MultiNodeClusterShardingSpec.intExtractEntityId, + extractShardId = MultiNodeClusterShardingSpec.intExtractShardId, + allocationStrategy = + new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1), + handOffStopMessage = ShardedEntity.Stop) } lazy val region = ClusterSharding(system).shardRegion("Entity") - def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData - s"Cluster with min-nr-of-members using sharding ($mode)" must { - if (!isDdataMode) { - "setup shared journal" in { - // start the Persistence extension - Persistence(system) - runOn(first) { - system.actorOf(Props[SharedLeveldbStore], "store") - } - enterBarrier("peristence-started") - - runOn(first, second, third) { - system.actorSelection(node(first) / "user" / "store") ! Identify(None) - val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get - SharedLeveldbJournal.setStore(sharedStore, system) - } - - enterBarrier("after-1") - } - } - "use all nodes" in within(30.seconds) { - join(first, first) - runOn(first) { - startSharding() - } - join(second, first) - runOn(second) { - startSharding() - } - join(third, first) + startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(first, second, third)) + + // the only test not asserting join status before starting to shard + join(first, first, onJoinedRunOnFrom = startSharding(), assertNodeUp = false) + join(second, first, onJoinedRunOnFrom = startSharding(), assertNodeUp = false) + join(third, first, assertNodeUp = false) // wait with starting sharding on third within(remaining) { awaitAssert { diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingQueriesSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingQueriesSpec.scala index c54e6caef5..762ee1f72b 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingQueriesSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingQueriesSpec.scala @@ -4,67 +4,44 @@ package akka.cluster.sharding -import akka.actor.{ Actor, ActorLogging, ActorRef, Props } -import akka.cluster.MultiNodeClusterSpec -import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec } -import akka.serialization.jackson.CborSerializable +import scala.concurrent.duration._ + +import akka.actor.Props import akka.testkit.TestProbe import com.typesafe.config.ConfigFactory import org.scalatest.concurrent.ScalaFutures -import scala.concurrent.duration._ - object ClusterShardingQueriesSpec { - case class Ping(id: Long) extends CborSerializable - case object Pong extends CborSerializable - - class EntityActor extends Actor with ActorLogging { - def receive: Receive = { - case _: Ping => sender() ! Pong - } - } + import MultiNodeClusterShardingSpec.PingPongActor val extractEntityId: ShardRegion.ExtractEntityId = { - case msg @ Ping(id) => (id.toString, msg) + case msg @ PingPongActor.Ping(id) => (id.toString, msg) } val numberOfShards = 6 val extractShardId: ShardRegion.ExtractShardId = { - case Ping(id) => (id % numberOfShards).toString + case PingPongActor.Ping(id) => (id % numberOfShards).toString } val shardTypeName = "DatatypeA" } -object ClusterShardingQueriesSpecConfig extends MultiNodeConfig { +object ClusterShardingQueriesSpecConfig + extends MultiNodeClusterShardingConfig(additionalConfig = """ + akka.log-dead-letters-during-shutdown = off + akka.cluster.sharding { + shard-region-query-timeout = 2ms + updating-state-timeout = 2s + waiting-for-state-timeout = 2s + } + """) { val controller = role("controller") val busy = role("busy") val second = role("second") val third = role("third") - commonConfig( - debugConfig(on = false) - .withFallback(ConfigFactory.parseString(""" - akka.loglevel = INFO - akka.actor.provider = "cluster" - akka.remote.classic.log-remote-lifecycle-events = off - akka.log-dead-letters-during-shutdown = off - akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning - akka.cluster.testkit.auto-down-unreachable-after = 0s - akka.cluster.sharding { - state-store-mode = "ddata" - shard-region-query-timeout = 2ms - updating-state-timeout = 2s - waiting-for-state-timeout = 2s - } - akka.cluster.sharding.distributed-data.durable.lmdb { - dir = target/ClusterShardingGetStatsSpec/sharding-ddata - map-size = 10 MiB - } - """).withFallback(MultiNodeClusterSpec.clusterConfig))) - val shardRoles = ConfigFactory.parseString("""akka.cluster.roles=["shard"]""") nodeConfig(busy)( @@ -79,29 +56,12 @@ class ClusterShardingQueriesSpecMultiJvmNode3 extends ClusterShardingQueriesSpec class ClusterShardingQueriesSpecMultiJvmNode4 extends ClusterShardingQueriesSpec abstract class ClusterShardingQueriesSpec - extends MultiNodeSpec(ClusterShardingQueriesSpecConfig) - with MultiNodeClusterSpec + extends MultiNodeClusterShardingSpec(ClusterShardingQueriesSpecConfig) with ScalaFutures { import ClusterShardingQueriesSpec._ import ClusterShardingQueriesSpecConfig._ - - def startShard(): ActorRef = { - ClusterSharding(system).start( - typeName = shardTypeName, - entityProps = Props(new EntityActor), - settings = ClusterShardingSettings(system).withRole("shard"), - extractEntityId = extractEntityId, - extractShardId = extractShardId) - } - - def startProxy(): ActorRef = { - ClusterSharding(system).startProxy( - typeName = shardTypeName, - role = Some("shard"), - extractEntityId = extractEntityId, - extractShardId = extractShardId) - } + import MultiNodeClusterShardingSpec.PingPongActor lazy val region = ClusterSharding(system).shardRegion(shardTypeName) @@ -111,11 +71,22 @@ abstract class ClusterShardingQueriesSpec awaitClusterUp(controller, busy, second, third) runOn(controller) { - startProxy() + startProxy( + system, + typeName = shardTypeName, + role = Some("shard"), + extractEntityId = extractEntityId, + extractShardId = extractShardId) } runOn(busy, second, third) { - startShard() + startSharding( + system, + typeName = shardTypeName, + entityProps = Props(new PingPongActor), + settings = settings.withRole("shard"), + extractEntityId = extractEntityId, + extractShardId = extractShardId) } enterBarrier("sharding started") @@ -126,9 +97,9 @@ abstract class ClusterShardingQueriesSpec within(10.seconds) { awaitAssert { val pingProbe = TestProbe() - (0 to 20).foreach(n => region.tell(Ping(n), pingProbe.ref)) + (0 to 20).foreach(n => region.tell(PingPongActor.Ping(n), pingProbe.ref)) pingProbe.receiveWhile(messages = 20) { - case Pong => () + case PingPongActor.Pong => () } } } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRegistrationCoordinatedShutdownSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRegistrationCoordinatedShutdownSpec.scala index 5e35bc2552..8326ed0326 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRegistrationCoordinatedShutdownSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRegistrationCoordinatedShutdownSpec.scala @@ -4,48 +4,23 @@ package akka.cluster.sharding -import scala.concurrent.Future -import scala.concurrent.duration._ - import akka.Done import akka.actor._ -import akka.cluster.Cluster import akka.cluster.MemberStatus -import akka.cluster.MultiNodeClusterSpec -import akka.remote.testconductor.RoleName -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.remote.testkit.STMultiNodeSpec -import akka.testkit._ -import com.typesafe.config.ConfigFactory +import akka.testkit.{ ImplicitSender, TestProbe } + +import scala.concurrent.Future +import scala.concurrent.duration._ /** * Test for issue #28416 */ -object ClusterShardingRegistrationCoordinatedShutdownSpec extends MultiNodeConfig { +object ClusterShardingRegistrationCoordinatedShutdownSpec extends MultiNodeClusterShardingConfig { + val first = role("first") val second = role("second") val third = role("third") - commonConfig(ConfigFactory.parseString(s""" - akka.loglevel = DEBUG # FIXME - """).withFallback(MultiNodeClusterSpec.clusterConfig)) - - class Entity extends Actor { - def receive = { - case id: Int => sender() ! id - } - } - - val extractEntityId: ShardRegion.ExtractEntityId = { - case id: Int => (id.toString, id) - } - - val extractShardId: ShardRegion.ExtractShardId = msg => - msg match { - case id: Int => id.toString - } - } class ClusterShardingRegistrationCoordinatedShutdownMultiJvmNode1 @@ -56,35 +31,14 @@ class ClusterShardingRegistrationCoordinatedShutdownMultiJvmNode3 extends ClusterShardingRegistrationCoordinatedShutdownSpec abstract class ClusterShardingRegistrationCoordinatedShutdownSpec - extends MultiNodeSpec(ClusterShardingRegistrationCoordinatedShutdownSpec) - with STMultiNodeSpec + extends MultiNodeClusterShardingSpec(ClusterShardingRegistrationCoordinatedShutdownSpec) with ImplicitSender { + import ClusterShardingRegistrationCoordinatedShutdownSpec._ + import MultiNodeClusterShardingSpec.ShardedEntity - override def initialParticipants: Int = roles.size - - private val cluster = Cluster(system) private lazy val region = ClusterSharding(system).shardRegion("Entity") - def join(from: RoleName, to: RoleName): Unit = { - runOn(from) { - cluster.join(node(to).address) - } - runOn(from) { - cluster.state.members.exists(m => m.uniqueAddress == cluster.selfUniqueAddress && m.status == MemberStatus.Up) - } - enterBarrier(from.name + "-joined") - } - - def startSharding(): Unit = { - ClusterSharding(system).start( - typeName = "Entity", - entityProps = Props[Entity], - settings = ClusterShardingSettings(system), - extractEntityId = extractEntityId, - extractShardId = extractShardId) - } - s"Region registration during CoordinatedShutdown" must { "try next oldest" in within(30.seconds) { @@ -109,7 +63,12 @@ abstract class ClusterShardingRegistrationCoordinatedShutdownSpec } - startSharding() + startSharding( + system, + typeName = "Entity", + entityProps = Props[ShardedEntity], + extractEntityId = MultiNodeClusterShardingSpec.intExtractEntityId, + extractShardId = MultiNodeClusterShardingSpec.intExtractShardId) enterBarrier("before-shutdown") diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesNewExtractorSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesNewExtractorSpec.scala index cad0ec91de..cfa8ac381f 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesNewExtractorSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesNewExtractorSpec.scala @@ -4,17 +4,11 @@ package akka.cluster.sharding -import java.io.File - import akka.actor._ -import akka.cluster.{ Cluster, MemberStatus, MultiNodeClusterSpec } -import akka.persistence.Persistence -import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore } -import akka.remote.testconductor.RoleName -import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } +import akka.cluster.{ Cluster, MemberStatus } +import akka.persistence.journal.leveldb.SharedLeveldbJournal import akka.testkit._ import com.typesafe.config.ConfigFactory -import org.apache.commons.io.FileUtils import scala.concurrent.duration._ @@ -52,37 +46,14 @@ object ClusterShardingRememberEntitiesNewExtractorSpec { } -abstract class ClusterShardingRememberEntitiesNewExtractorSpecConfig(val mode: String) extends MultiNodeConfig { +abstract class ClusterShardingRememberEntitiesNewExtractorSpecConfig(mode: String) + extends MultiNodeClusterShardingConfig( + mode, + additionalConfig = "akka.persistence.journal.leveldb-shared.store.native = off") { val first = role("first") val second = role("second") val third = role("third") - commonConfig( - ConfigFactory - .parseString(s""" - akka.actor.provider = "cluster" - akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning - akka.cluster.testkit.auto-down-unreachable-after = 0s - akka.remote.classic.log-remote-lifecycle-events = off - akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" - akka.persistence.journal.leveldb-shared { - timeout = 5s - store { - native = off - dir = "target/ShardingRememberEntitiesNewExtractorSpec/journal" - } - } - akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/ShardingRememberEntitiesNewExtractorSpec/snapshots" - akka.cluster.sharding.state-store-mode = "$mode" - akka.cluster.sharding.distributed-data.durable.lmdb { - dir = target/ShardingRememberEntitiesNewExtractorSpec/sharding-ddata - map-size = 10 MiB - } - """) - .withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest) - .withFallback(MultiNodeClusterSpec.clusterConfig)) - val roleConfig = ConfigFactory.parseString(""" akka.cluster.roles = [sharding] """) @@ -132,82 +103,41 @@ class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode3 extends DDataClusterShardingRememberEntitiesNewExtractorSpec abstract class ClusterShardingRememberEntitiesNewExtractorSpec( - config: ClusterShardingRememberEntitiesNewExtractorSpecConfig) - extends MultiNodeSpec(config) - with STMultiNodeSpec + multiNodeConfig: ClusterShardingRememberEntitiesNewExtractorSpecConfig) + extends MultiNodeClusterShardingSpec(multiNodeConfig) with ImplicitSender { import ClusterShardingRememberEntitiesNewExtractorSpec._ - import config._ + import multiNodeConfig._ val typeName = "Entity" - override def initialParticipants = roles.size - - val storageLocations = List( - new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) - - override protected def atStartup(): Unit = { - storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir)) - enterBarrier("startup") - } - - override protected def afterTermination(): Unit = { - storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir)) - } - - def join(from: RoleName, to: RoleName): Unit = { - runOn(from) { - Cluster(system).join(node(to).address) - } - enterBarrier(from.name + "-joined") - } - - val cluster = Cluster(system) - def startShardingWithExtractor1(): Unit = { - ClusterSharding(system).start( + startSharding( + system, typeName = typeName, entityProps = ClusterShardingRememberEntitiesNewExtractorSpec.props(None), - settings = ClusterShardingSettings(system).withRememberEntities(true).withRole("sharding"), + settings = settings.withRole("sharding"), extractEntityId = extractEntityId, extractShardId = extractShardId1) } def startShardingWithExtractor2(sys: ActorSystem, probe: ActorRef): Unit = { - ClusterSharding(sys).start( + startSharding( + sys, typeName = typeName, entityProps = ClusterShardingRememberEntitiesNewExtractorSpec.props(Some(probe)), - settings = ClusterShardingSettings(system).withRememberEntities(true).withRole("sharding"), + settings = ClusterShardingSettings(sys).withRememberEntities(config.rememberEntities).withRole("sharding"), extractEntityId = extractEntityId, extractShardId = extractShardId2) } def region(sys: ActorSystem = system) = ClusterSharding(sys).shardRegion(typeName) - def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData - s"Cluster with min-nr-of-members using sharding ($mode)" must { - if (!isDdataMode) { - "setup shared journal" in { - // start the Persistence extension - Persistence(system) - runOn(first) { - system.actorOf(Props[SharedLeveldbStore], "store") - } - enterBarrier("persistence-started") - - runOn(second, third) { - system.actorSelection(node(first) / "user" / "store") ! Identify(None) - val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get - SharedLeveldbJournal.setStore(sharedStore, system) - } - - enterBarrier("after-1") - } - } - "start up first cluster and sharding" in within(15.seconds) { + startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(second, third)) + join(first, first) join(second, first) join(third, first) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala index cee71ab582..28e5e2255d 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala @@ -4,23 +4,15 @@ package akka.cluster.sharding -import java.io.File import java.util.concurrent.TimeUnit.NANOSECONDS -import scala.concurrent.duration._ - import akka.actor._ -import akka.cluster.Cluster import akka.cluster.MemberStatus -import akka.cluster.MultiNodeClusterSpec -import akka.remote.testconductor.RoleName -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.remote.testkit.STMultiNodeSpec import akka.testkit._ import akka.util.ccompat._ import com.typesafe.config.ConfigFactory -import org.apache.commons.io.FileUtils + +import scala.concurrent.duration._ @ccompatUsedUntil213 object ClusterShardingRememberEntitiesPerfSpec { @@ -48,32 +40,22 @@ object ClusterShardingRememberEntitiesPerfSpec { } -object ClusterShardingRememberEntitiesPerfSpecConfig extends MultiNodeConfig { +object ClusterShardingRememberEntitiesPerfSpecConfig extends MultiNodeClusterShardingConfig(additionalConfig = s""" + akka.testconductor.barrier-timeout = 3 minutes + akka.remote.artery.advanced.outbound-message-queue-size = 10000 + akka.remote.artery.advanced.maximum-frame-size = 512 KiB + # comment next line to enable durable lmdb storage + akka.cluster.sharding.distributed-data.durable.keys = [] + """) { + val first = role("first") val second = role("second") val third = role("third") - commonConfig(ConfigFactory.parseString(s""" - akka.loglevel = INFO - akka.actor.provider = "cluster" - akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning - akka.cluster.testkit.auto-down-unreachable-after = 0s - akka.remote.log-remote-lifecycle-events = off - akka.testconductor.barrier-timeout = 3 minutes - akka.remote.artery.advanced.outbound-message-queue-size = 10000 - akka.remote.artery.advanced.maximum-frame-size = 512 KiB - akka.cluster.sharding.state-store-mode = "ddata" - akka.cluster.sharding.distributed-data.durable.lmdb { - dir = target/ShardingRememberEntitiesPerfSpec/sharding-ddata - } - # comment next line to enable durable lmdb storage - akka.cluster.sharding.distributed-data.durable.keys = [] - """).withFallback(MultiNodeClusterSpec.clusterConfig)) - nodeConfig(third)(ConfigFactory.parseString(s""" akka.cluster.sharding.distributed-data.durable.lmdb { # use same directory when starting new node on third (not used at same time) - dir = target/ShardingRememberEntitiesSpec/sharding-third + dir = "$targetDir/sharding-third" } """)) } @@ -83,41 +65,17 @@ class ClusterShardingRememberEntitiesPerfSpecMultiJvmNode2 extends ClusterShardi class ClusterShardingRememberEntitiesPerfSpecMultiJvmNode3 extends ClusterShardingRememberEntitiesPerfSpec abstract class ClusterShardingRememberEntitiesPerfSpec - extends MultiNodeSpec(ClusterShardingRememberEntitiesPerfSpecConfig) - with STMultiNodeSpec + extends MultiNodeClusterShardingSpec(ClusterShardingRememberEntitiesPerfSpecConfig) with ImplicitSender { import ClusterShardingRememberEntitiesPerfSpec._ import ClusterShardingRememberEntitiesPerfSpecConfig._ - override def initialParticipants = roles.size - - val storageLocations = List( - new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) - - override protected def atStartup(): Unit = { - storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir)) - enterBarrier("startup") - } - - override protected def afterTermination(): Unit = { - storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir)) - } - - def join(from: RoleName, to: RoleName): Unit = { - runOn(from) { - Cluster(system).join(node(to).address) - } - enterBarrier(from.name + "-joined") - } - - val cluster = Cluster(system) - def startSharding(): Unit = { (1 to 3).foreach { n => - ClusterSharding(system).start( + startSharding( + system, typeName = s"Entity$n", entityProps = ClusterShardingRememberEntitiesPerfSpec.props(), - settings = ClusterShardingSettings(system).withRememberEntities(true), extractEntityId = extractEntityId, extractShardId = extractShardId) } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala index 68deedf986..696a9b6185 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala @@ -4,36 +4,17 @@ package akka.cluster.sharding -import java.io.File +import scala.concurrent.duration._ import akka.actor._ -import akka.cluster.{ Cluster, MemberStatus, MultiNodeClusterSpec } -import akka.persistence.Persistence -import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore } -import akka.remote.testconductor.RoleName -import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } +import akka.cluster.{ Cluster, MemberStatus } import akka.testkit._ -import com.typesafe.config.ConfigFactory -import org.apache.commons.io.FileUtils import akka.util.ccompat._ - -import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory @ccompatUsedUntil213 object ClusterShardingRememberEntitiesSpec { - final case class Started(ref: ActorRef) - - def props(probe: ActorRef): Props = Props(new TestEntity(probe)) - - class TestEntity(probe: ActorRef) extends Actor { - probe ! Started(self) - - def receive = { - case m => sender() ! m - } - } - val extractEntityId: ShardRegion.ExtractEntityId = { case id: Int => (id.toString, id) } @@ -46,43 +27,20 @@ object ClusterShardingRememberEntitiesSpec { } -abstract class ClusterShardingRememberEntitiesSpecConfig(val mode: String, val rememberEntities: Boolean) - extends MultiNodeConfig { +abstract class ClusterShardingRememberEntitiesSpecConfig(mode: String, rememberEntities: Boolean) + extends MultiNodeClusterShardingConfig( + mode, + rememberEntities, + additionalConfig = s""" + akka.testconductor.barrier-timeout = 60 s + akka.test.single-expect-default = 60 s + akka.persistence.journal.leveldb-shared.store.native = off + """) { + val first = role("first") val second = role("second") val third = role("third") - val targetDir = s"target/ClusterShardingRememberEntitiesSpec-$mode-remember-$rememberEntities" - - val modeConfig = - if (mode == ClusterShardingSettings.StateStoreModeDData) ConfigFactory.empty - else ConfigFactory.parseString(s""" - akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" - akka.persistence.journal.leveldb-shared.timeout = 5s - akka.persistence.journal.leveldb-shared.store.native = off - akka.persistence.journal.leveldb-shared.store.dir = "$targetDir/journal" - akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "$targetDir/snapshots" - """) - - commonConfig( - modeConfig - .withFallback(ConfigFactory.parseString(s""" - akka.actor.provider = "cluster" - akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning - akka.cluster.testkit.auto-down-unreachable-after = 0s - akka.remote.log-remote-lifecycle-events = off - akka.cluster.sharding.state-store-mode = "$mode" - akka.cluster.sharding.distributed-data.durable.lmdb { - dir = $targetDir/sharding-ddata - map-size = 10 MiB - } - akka.testconductor.barrier-timeout = 60 s - akka.test.single-expect-default = 60 s - """)) - .withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest) - .withFallback(MultiNodeClusterSpec.clusterConfig)) - nodeConfig(third)(ConfigFactory.parseString(s""" akka.cluster.sharding.distributed-data.durable.lmdb { # use same directory when starting new node on third (not used at same time) @@ -99,10 +57,10 @@ class PersistentClusterShardingRememberEntitiesSpecConfig(rememberEntities: Bool class DDataClusterShardingRememberEntitiesSpecConfig(rememberEntities: Boolean) extends ClusterShardingRememberEntitiesSpecConfig(ClusterShardingSettings.StateStoreModeDData, rememberEntities) -abstract class PersistentClusterShardingRememberEntitiesSpec(val rememberEntities: Boolean) +abstract class PersistentClusterShardingRememberEntitiesSpec(rememberEntities: Boolean) extends ClusterShardingRememberEntitiesSpec( new PersistentClusterShardingRememberEntitiesSpecConfig(rememberEntities)) -abstract class DDataClusterShardingRememberEntitiesSpec(val rememberEntities: Boolean) +abstract class DDataClusterShardingRememberEntitiesSpec(rememberEntities: Boolean) extends ClusterShardingRememberEntitiesSpec(new DDataClusterShardingRememberEntitiesSpecConfig(rememberEntities)) class PersistentClusterShardingRememberEntitiesEnabledMultiJvmNode1 @@ -127,45 +85,20 @@ class DDataClusterShardingRememberEntitiesDefaultMultiJvmNode1 extends DDataClus class DDataClusterShardingRememberEntitiesDefaultMultiJvmNode2 extends DDataClusterShardingRememberEntitiesSpec(false) class DDataClusterShardingRememberEntitiesDefaultMultiJvmNode3 extends DDataClusterShardingRememberEntitiesSpec(false) -abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememberEntitiesSpecConfig) - extends MultiNodeSpec(config) - with STMultiNodeSpec +abstract class ClusterShardingRememberEntitiesSpec(multiNodeConfig: ClusterShardingRememberEntitiesSpecConfig) + extends MultiNodeClusterShardingSpec(multiNodeConfig) with ImplicitSender { import ClusterShardingRememberEntitiesSpec._ - import config._ - - override def initialParticipants: Int = roles.size + import MultiNodeClusterShardingSpec.EntityActor + import multiNodeConfig._ val dataType = "Entity" - val storageLocations = List( - new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) - - override protected def atStartup(): Unit = { - storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir)) - enterBarrier("startup") - } - - override protected def afterTermination(): Unit = { - storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir)) - } - - def join(from: RoleName, to: RoleName): Unit = { - runOn(from) { - Cluster(system).join(node(to).address) - awaitAssert { - Cluster(system).state.isMemberUp(node(from).address) - } - } - enterBarrier(from.name + "-joined") - } - - val cluster = Cluster(system) - def startSharding(sys: ActorSystem, probe: ActorRef): ActorRef = { - ClusterSharding(sys).start( + startSharding( + sys, typeName = dataType, - entityProps = ClusterShardingRememberEntitiesSpec.props(probe), + entityProps = Props(new EntityActor(probe)), settings = ClusterShardingSettings(sys).withRememberEntities(rememberEntities), extractEntityId = extractEntityId, extractShardId = extractShardId) @@ -173,45 +106,24 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb lazy val region = ClusterSharding(system).shardRegion(dataType) - def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData - - def expectEntityRestarted(sys: ActorSystem, event: Int, probe: TestProbe, entityProbe: TestProbe): Started = { + def expectEntityRestarted( + sys: ActorSystem, + event: Int, + probe: TestProbe, + entityProbe: TestProbe): EntityActor.Started = { if (!rememberEntities) { probe.send(ClusterSharding(sys).shardRegion(dataType), event) probe.expectMsg(1) } - entityProbe.expectMsgType[Started](30.seconds) + entityProbe.expectMsgType[EntityActor.Started](30.seconds) } - def setStoreIfNotDdata(sys: ActorSystem): Unit = - if (!isDdataMode) { - val probe = TestProbe()(sys) - sys.actorSelection(node(first) / "user" / "store").tell(Identify(None), probe.ref) - val sharedStore = probe.expectMsgType[ActorIdentity](20.seconds).ref.get - SharedLeveldbJournal.setStore(sharedStore, sys) - } - s"Cluster sharding with remember entities ($mode)" must { - if (!isDdataMode) { - "setup shared journal" in { - // start the Persistence extension - Persistence(system) - runOn(first) { - system.actorOf(Props[SharedLeveldbStore], "store") - } - enterBarrier("persistence-started") - - runOn(first, second, third) { - setStoreIfNotDdata(system) - } - - enterBarrier("after-1") - } - } - "start remembered entities when coordinator fail over" in within(30.seconds) { + startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(first, second, third)) + val entityProbe = TestProbe() val probe = TestProbe() join(second, second) @@ -219,7 +131,7 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb startSharding(system, entityProbe.ref) probe.send(region, 1) probe.expectMsg(1) - entityProbe.expectMsgType[Started] + entityProbe.expectMsgType[EntityActor.Started] } enterBarrier("second-started") @@ -269,7 +181,7 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb val entityProbe2 = TestProbe()(sys2) val probe2 = TestProbe()(sys2) - setStoreIfNotDdata(sys2) + if (!isDdataMode) setStore(sys2, storeOn = first) Cluster(sys2).join(Cluster(sys2).selfAddress) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRolePartitioningSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRolePartitioningSpec.scala index 76ddd0e12c..1b2ff0149a 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRolePartitioningSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRolePartitioningSpec.scala @@ -5,10 +5,7 @@ package akka.cluster.sharding import akka.actor._ -import akka.cluster.MultiNodeClusterSpec import akka.cluster.sharding.ShardRegion.{ ClusterShardingStats, GetClusterShardingStats } -import akka.persistence.journal.leveldb.SharedLeveldbJournal -import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec } import akka.testkit._ import com.typesafe.config.{ Config, ConfigFactory } @@ -39,7 +36,7 @@ object E2 { } } -abstract class ClusterShardingMinMembersPerRoleConfig extends MultiNodeConfig { +abstract class ClusterShardingMinMembersPerRoleConfig extends MultiNodeClusterShardingConfig { val first = role("first") val second = role("second") val third = role("third") @@ -49,19 +46,6 @@ abstract class ClusterShardingMinMembersPerRoleConfig extends MultiNodeConfig { val r1Config: Config = ConfigFactory.parseString("""akka.cluster.roles = [ "R1" ]""") val r2Config: Config = ConfigFactory.parseString("""akka.cluster.roles = [ "R2" ]""") - commonConfig( - ConfigFactory - .parseString(""" - akka.actor.provider = "cluster" - akka.cluster.sharding.state-store-mode = "ddata" - akka.cluster.sharding.distributed-data.durable.lmdb { - dir = target/ClusterShardingMinMembersSpec/sharding- - map-size = 10 MiB - } - """) - .withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest) - .withFallback(MultiNodeClusterSpec.clusterConfig)) - } object ClusterShardingMinMembersPerRoleNotConfiguredConfig extends ClusterShardingMinMembersPerRoleConfig { @@ -110,14 +94,11 @@ class ClusterShardingMinMembersPerRoleSpecMultiJvmNode3 extends ClusterShardingM class ClusterShardingMinMembersPerRoleSpecMultiJvmNode4 extends ClusterShardingMinMembersPerRoleSpec class ClusterShardingMinMembersPerRoleSpecMultiJvmNode5 extends ClusterShardingMinMembersPerRoleSpec -abstract class ClusterShardingRolePartitioningSpec(config: ClusterShardingMinMembersPerRoleConfig) - extends MultiNodeSpec(config) - with MultiNodeClusterSpec +abstract class ClusterShardingRolePartitioningSpec(multiNodeConfig: ClusterShardingMinMembersPerRoleConfig) + extends MultiNodeClusterShardingSpec(multiNodeConfig) with ImplicitSender { - import config._ - - override def initialParticipants: Int = roles.size + import multiNodeConfig._ private val fourthAddress = node(fourth).address private val fifthAddress = node(fifth).address @@ -126,20 +107,22 @@ abstract class ClusterShardingRolePartitioningSpec(config: ClusterShardingMinMem "start the cluster, await convergence, init sharding on every node: 2 data types, 'akka.cluster.min-nr-of-members=2', partition shard location by 2 roles" in { // start sharding early - ClusterSharding(system).start( + startSharding( + system, typeName = E1.TypeKey, entityProps = TestActors.echoActorProps, // nodes 1,2,3: role R1, shard region E1, proxy region E2 - settings = ClusterShardingSettings(system).withRole("R1"), + settings = settings.withRole("R1"), extractEntityId = E1.extractEntityId, extractShardId = E1.extractShardId) // when run on first, second and third (role R1) proxy region is started - ClusterSharding(system).start( + startSharding( + system, typeName = E2.TypeKey, entityProps = TestActors.echoActorProps, // nodes 4,5: role R2, shard region E2, proxy region E1 - settings = ClusterShardingSettings(system).withRole("R2"), + settings = settings.withRole("R2"), extractEntityId = E2.extractEntityId, extractShardId = E2.extractShardId) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSingleShardPerEntitySpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSingleShardPerEntitySpec.scala index 14bb6f4af7..8ee217a0ab 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSingleShardPerEntitySpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSingleShardPerEntitySpec.scala @@ -4,53 +4,25 @@ package akka.cluster.sharding -import scala.concurrent.duration._ - import akka.actor._ -import akka.cluster.Cluster -import akka.cluster.MultiNodeClusterSpec import akka.remote.testconductor.RoleName -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.remote.testkit.STMultiNodeSpec import akka.remote.transport.ThrottlerTransportAdapter.Direction import akka.testkit._ -import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ /** * one-to-one mapping between shards and entities is not efficient but some use that anyway */ -object ClusterShardingSingleShardPerEntitySpec { - class Entity extends Actor { - def receive = { - case id: Int => sender() ! id - } - } +object ClusterShardingSingleShardPerEntitySpecConfig + extends MultiNodeClusterShardingConfig(additionalConfig = "akka.cluster.sharding.updating-state-timeout = 1s") { - val extractEntityId: ShardRegion.ExtractEntityId = { - case id: Int => (id.toString, id) - } - - val extractShardId: ShardRegion.ExtractShardId = { - case id: Int => id.toString - } - -} - -object ClusterShardingSingleShardPerEntitySpecConfig extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") val fourth = role("fourth") val fifth = role("fifth") - commonConfig(ConfigFactory.parseString(s""" - akka.loglevel = INFO - akka.actor.provider = "cluster" - akka.cluster.sharding.state-store-mode = ddata - akka.cluster.sharding.updating-state-timeout = 1s - """).withFallback(MultiNodeClusterSpec.clusterConfig)) - testTransport(on = true) } @@ -61,29 +33,21 @@ class ClusterShardingSingleShardPerEntitySpecMultiJvmNode4 extends ClusterShardi class ClusterShardingSingleShardPerEntitySpecMultiJvmNode5 extends ClusterShardingSingleShardPerEntitySpec abstract class ClusterShardingSingleShardPerEntitySpec - extends MultiNodeSpec(ClusterShardingSingleShardPerEntitySpecConfig) - with STMultiNodeSpec + extends MultiNodeClusterShardingSpec(ClusterShardingSingleShardPerEntitySpecConfig) with ImplicitSender { - import ClusterShardingSingleShardPerEntitySpec._ import ClusterShardingSingleShardPerEntitySpecConfig._ - - override def initialParticipants = roles.size + import MultiNodeClusterShardingSpec.ShardedEntity def join(from: RoleName, to: RoleName): Unit = { - runOn(from) { - Cluster(system).join(node(to).address) - startSharding() - } - enterBarrier(from.name + "-joined") - } - - def startSharding(): Unit = { - ClusterSharding(system).start( - typeName = "Entity", - entityProps = Props[Entity], - settings = ClusterShardingSettings(system), - extractEntityId = extractEntityId, - extractShardId = extractShardId) + join( + from, + to, + startSharding( + system, + typeName = "Entity", + entityProps = Props[ShardedEntity], + extractEntityId = MultiNodeClusterShardingSpec.intExtractEntityId, + extractShardId = MultiNodeClusterShardingSpec.intExtractShardId)) } lazy val region = ClusterSharding(system).shardRegion("Entity") diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index 763afc17fa..b21d1c4f9e 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -4,33 +4,22 @@ package akka.cluster.sharding +import akka.actor._ +import akka.cluster.Cluster import akka.cluster.ddata.{ Replicator, ReplicatorSettings } import akka.cluster.sharding.ShardCoordinator.Internal.{ HandOff, ShardStopped } -import akka.cluster.sharding.ShardRegion.Passivate -import akka.cluster.sharding.ShardRegion.GetCurrentRegions -import akka.cluster.sharding.ShardRegion.CurrentRegions -import language.postfixOps -import scala.concurrent.duration._ - -import com.typesafe.config.ConfigFactory -import akka.actor._ -import akka.cluster.{ Cluster, MultiNodeClusterSpec } -import akka.persistence.PersistentActor -import akka.persistence.Persistence -import akka.persistence.journal.leveldb.SharedLeveldbJournal -import akka.persistence.journal.leveldb.SharedLeveldbStore -import akka.remote.testconductor.RoleName -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.remote.testkit.STMultiNodeSpec -import akka.testkit._ -import akka.testkit.TestEvent.Mute -import java.io.File - -import org.apache.commons.io.FileUtils -import akka.cluster.singleton.ClusterSingletonManager -import akka.cluster.singleton.ClusterSingletonManagerSettings +import akka.cluster.sharding.ShardRegion.{ CurrentRegions, GetCurrentRegions, Passivate } +import akka.cluster.singleton.{ ClusterSingletonManager, ClusterSingletonManagerSettings } import akka.pattern.BackoffOpts +import akka.persistence.{ Persistence, PersistentActor } +import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore } +import akka.remote.testconductor.RoleName +import akka.testkit.TestEvent.Mute +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ +import scala.language.postfixOps object ClusterShardingSpec { //#counter-actor @@ -118,8 +107,8 @@ object ClusterShardingSpec { } -abstract class ClusterShardingSpecConfig(val mode: String, val entityRecoveryStrategy: String = "all") - extends MultiNodeConfig { +abstract class ClusterShardingSpecConfig(mode: String, val entityRecoveryStrategy: String = "all") + extends MultiNodeClusterShardingConfig(mode) { val controller = role("controller") val first = role("first") @@ -129,30 +118,22 @@ abstract class ClusterShardingSpecConfig(val mode: String, val entityRecoveryStr val fifth = role("fifth") val sixth = role("sixth") - commonConfig( - ConfigFactory - .parseString(s""" - akka.loglevel = INFO - akka.actor.provider = "cluster" - akka.remote.log-remote-lifecycle-events = off - akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning - akka.cluster.testkit.auto-down-unreachable-after = 0s + /** This is the only test that creates the shared store regardless of mode, + * because it uses a PersistentActor. So unlike all other uses of + * `MultiNodeClusterShardingConfig`, we use `MultiNodeConfig.commonConfig` here, + * and call `MultiNodeClusterShardingConfig.persistenceConfig` which does not check + * mode, then leverage the common config and fallbacks after these specific test configs: + */ + commonConfig(ConfigFactory.parseString(s""" akka.cluster.roles = ["backend"] akka.cluster.distributed-data.gossip-interval = 1s - akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" - akka.persistence.journal.leveldb-shared.store { - native = off - dir = "target/ClusterShardingSpec/journal" - } - akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/ClusterShardingSpec/snapshots" + akka.persistence.journal.leveldb-shared.timeout = 10s #the original default, base test uses 5s akka.cluster.sharding { retry-interval = 1 s handoff-timeout = 10 s shard-start-timeout = 5s entity-restart-backoff = 1s rebalance-interval = 2 s - state-store-mode = "$mode" entity-recovery-strategy = "$entityRecoveryStrategy" entity-recovery-constant-rate-strategy { frequency = 1 ms @@ -162,10 +143,6 @@ abstract class ClusterShardingSpecConfig(val mode: String, val entityRecoveryStr rebalance-threshold = 1 max-simultaneous-rebalance = 1 } - distributed-data.durable.lmdb { - dir = target/ClusterShardingSpec/sharding-ddata - map-size = 10 MiB - } } akka.testconductor.barrier-timeout = 70s @@ -179,12 +156,11 @@ abstract class ClusterShardingSpecConfig(val mode: String, val entityRecoveryStr "${ClusterShardingSpec.Stop.getClass.getName}" = java-test "${classOf[ClusterShardingSpec.CounterChanged].getName}" = java-test "${classOf[ShardRegion.Passivate].getName}" = java-test - + } - """) - .withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest) - .withFallback(MultiNodeClusterSpec.clusterConfig)) + """).withFallback(MultiNodeClusterShardingConfig.persistenceConfig(targetDir)).withFallback(common)) + nodeConfig(sixth) { ConfigFactory.parseString("""akka.cluster.roles = ["frontend"]""") } @@ -226,11 +202,13 @@ object ClusterShardingDocCode { } -object PersistentClusterShardingSpecConfig extends ClusterShardingSpecConfig("persistence") -object DDataClusterShardingSpecConfig extends ClusterShardingSpecConfig("ddata") +object PersistentClusterShardingSpecConfig + extends ClusterShardingSpecConfig(ClusterShardingSettings.StateStoreModePersistence) +object DDataClusterShardingSpecConfig extends ClusterShardingSpecConfig(ClusterShardingSettings.StateStoreModeDData) object PersistentClusterShardingWithEntityRecoverySpecConfig - extends ClusterShardingSpecConfig("persistence", "constant") -object DDataClusterShardingWithEntityRecoverySpecConfig extends ClusterShardingSpecConfig("ddata", "constant") + extends ClusterShardingSpecConfig(ClusterShardingSettings.StateStoreModePersistence, "constant") +object DDataClusterShardingWithEntityRecoverySpecConfig + extends ClusterShardingSpecConfig(ClusterShardingSettings.StateStoreModeDData, "constant") class PersistentClusterShardingSpec extends ClusterShardingSpec(PersistentClusterShardingSpecConfig) class DDataClusterShardingSpec extends ClusterShardingSpec(DDataClusterShardingSpecConfig) @@ -271,32 +249,14 @@ class DDataClusterShardingWithEntityRecoveryMultiJvmNode5 extends DDataClusterSh class DDataClusterShardingWithEntityRecoveryMultiJvmNode6 extends DDataClusterShardingWithEntityRecoverySpec class DDataClusterShardingWithEntityRecoveryMultiJvmNode7 extends DDataClusterShardingWithEntityRecoverySpec -abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) - extends MultiNodeSpec(config) - with MultiNodeClusterSpec - with STMultiNodeSpec +abstract class ClusterShardingSpec(multiNodeConfig: ClusterShardingSpecConfig) + extends MultiNodeClusterShardingSpec(multiNodeConfig) with ImplicitSender { import ClusterShardingSpec._ - import config._ - - val storageLocations = List( - new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) - - override protected def atStartup(): Unit = { - storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir)) - enterBarrier("startup") - } - - override protected def afterTermination(): Unit = { - storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir)) - } + import multiNodeConfig._ def join(from: RoleName, to: RoleName): Unit = { - runOn(from) { - Cluster(system).join(node(to).address) - createCoordinator() - } - enterBarrier(from.name + "-joined") + join(from, to, createCoordinator()) } lazy val replicator = system.actorOf( @@ -381,8 +341,6 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) lazy val rebalancingPersistentRegion = createRegion("RebalancingRememberCounter", rememberEntities = true) lazy val autoMigrateRegion = createRegion("AutoMigrateRememberRegionTest", rememberEntities = true) - def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData - s"Cluster sharding ($mode)" must { // must be done also in ddata mode since Counter is PersistentActor diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ExternalShardAllocationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ExternalShardAllocationSpec.scala index 8115f23c1c..e0164e50c2 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ExternalShardAllocationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ExternalShardAllocationSpec.scala @@ -4,42 +4,24 @@ package akka.cluster.sharding -import akka.actor.Actor -import akka.actor.ActorLogging -import akka.actor.Address -import akka.actor.PoisonPill -import akka.actor.Props +import akka.actor.{ Actor, ActorLogging, Address, Props } import akka.cluster.Cluster -import akka.cluster.MultiNodeClusterSpec -import akka.cluster.sharding.ExternalShardAllocationSpec.GiveMeYourHome.Get -import akka.cluster.sharding.ExternalShardAllocationSpec.GiveMeYourHome.Home -import akka.cluster.sharding.external.ExternalShardAllocation -import akka.cluster.sharding.external.ExternalShardAllocationStrategy -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec +import akka.cluster.sharding.ExternalShardAllocationSpec.GiveMeYourHome.{ Get, Home } +import akka.cluster.sharding.external.{ ExternalShardAllocation, ExternalShardAllocationStrategy } import akka.serialization.jackson.CborSerializable -import akka.testkit.ImplicitSender -import akka.testkit.TestProbe -import com.typesafe.config.ConfigFactory +import akka.testkit.{ ImplicitSender, TestProbe } import org.scalatest.concurrent.ScalaFutures import scala.concurrent.duration._ -object ExternalShardAllocationSpecConfig extends MultiNodeConfig { - - commonConfig(ConfigFactory.parseString(""" - akka.loglevel = INFO - akka.actor.provider = "cluster" +object ExternalShardAllocationSpecConfig + extends MultiNodeClusterShardingConfig(additionalConfig = """ akka.cluster.sharding { - distributed-data.durable.lmdb { - dir = target/ExternalShardAllocationSpec/sharding-ddata - map-size = 10 MiB - } retry-interval = 2000ms waiting-for-state-timeout = 2000ms rebalance-interval = 1s } - """).withFallback(MultiNodeClusterSpec.clusterConfig)) + """) { val first = role("first") val second = role("second") @@ -82,14 +64,13 @@ object ExternalShardAllocationSpec { } abstract class ExternalShardAllocationSpec - extends MultiNodeSpec(ExternalShardAllocationSpecConfig) - with MultiNodeClusterSpec + extends MultiNodeClusterShardingSpec(ExternalShardAllocationSpecConfig) with ImplicitSender with ScalaFutures { - import ExternalShardAllocationSpecConfig._ - import ExternalShardAllocationSpec._ import ExternalShardAllocationSpec.GiveMeYourHome._ + import ExternalShardAllocationSpec._ + import ExternalShardAllocationSpecConfig._ override implicit val patienceConfig: PatienceConfig = PatienceConfig(5.second) @@ -102,16 +83,13 @@ abstract class ExternalShardAllocationSpec enterBarrier("cluster-started") } - lazy val shardRegion = { - ClusterSharding(system).start( - typeName = typeName, - entityProps = Props[GiveMeYourHome], - settings = ClusterShardingSettings(system), - extractEntityId = extractEntityId, - extractShardId = extractShardId, - new ExternalShardAllocationStrategy(system, typeName), - PoisonPill) - } + lazy val shardRegion = startSharding( + system, + typeName = typeName, + entityProps = Props[GiveMeYourHome], + extractEntityId = extractEntityId, + extractShardId = extractShardId, + allocationStrategy = new ExternalShardAllocationStrategy(system, typeName)) "start cluster sharding" in { shardRegion diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiDcClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiDcClusterShardingSpec.scala index bf1ee002e8..bcc1df84bb 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiDcClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiDcClusterShardingSpec.scala @@ -4,23 +4,16 @@ package akka.cluster.sharding -import scala.concurrent.duration._ - -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.Address -import akka.actor.Props -import akka.cluster.{ Cluster, MemberStatus, MultiNodeClusterSpec } -import akka.cluster.sharding.ShardRegion.CurrentRegions -import akka.cluster.sharding.ShardRegion.GetCurrentRegions +import akka.actor.{ Actor, ActorRef, Address, Props } +import akka.cluster.sharding.ShardRegion.{ CurrentRegions, GetCurrentRegions } +import akka.cluster.{ Cluster, MemberStatus } import akka.remote.testconductor.RoleName -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.remote.testkit.STMultiNodeSpec import akka.serialization.jackson.CborSerializable import akka.testkit._ -import com.typesafe.config.ConfigFactory import akka.util.ccompat._ +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ @ccompatUsedUntil213 object MultiDcClusterShardingSpec { @@ -50,26 +43,23 @@ object MultiDcClusterShardingSpec { } } -object MultiDcClusterShardingSpecConfig extends MultiNodeConfig { +object MultiDcClusterShardingSpecConfig + extends MultiNodeClusterShardingConfig( + loglevel = "DEBUG", //issue #23741 + additionalConfig = s""" + akka.cluster { + debug.verbose-heartbeat-logging = on + debug.verbose-gossip-logging = on + sharding.retry-interval = 200ms + } + akka.remote.log-remote-lifecycle-events = on + """) { + val first = role("first") val second = role("second") val third = role("third") val fourth = role("fourth") - commonConfig(ConfigFactory.parseString(s""" - akka.loglevel = DEBUG # issue #23741 - akka.cluster { - debug.verbose-heartbeat-logging = on - debug.verbose-gossip-logging = on - downing-provider-class = akka.cluster.testkit.AutoDowning - testkit.auto-down-unreachable-after = 0s - sharding { - retry-interval = 200ms - } - } - akka.remote.log-remote-lifecycle-events = on - """).withFallback(MultiNodeClusterSpec.clusterConfig)) - nodeConfig(first, second) { ConfigFactory.parseString("akka.cluster.multi-data-center.self-data-center = DC1") } @@ -85,34 +75,32 @@ class MultiDcClusterShardingSpecMultiJvmNode3 extends MultiDcClusterShardingSpec class MultiDcClusterShardingSpecMultiJvmNode4 extends MultiDcClusterShardingSpec abstract class MultiDcClusterShardingSpec - extends MultiNodeSpec(MultiDcClusterShardingSpecConfig) - with MultiNodeClusterSpec - with STMultiNodeSpec + extends MultiNodeClusterShardingSpec(MultiDcClusterShardingSpecConfig) with ImplicitSender { import MultiDcClusterShardingSpec._ import MultiDcClusterShardingSpecConfig._ def join(from: RoleName, to: RoleName): Unit = { - runOn(from) { - cluster.join(node(to).address) - startSharding() - withClue( - s"Failed waiting for ${cluster.selfUniqueAddress} to be up. Current state: ${cluster.state}" + cluster.state) { - within(15.seconds) { - awaitAssert(cluster.state.members.exists { m => - m.uniqueAddress == cluster.selfUniqueAddress && m.status == MemberStatus.Up - } should be(true)) + join( + from, + to, { + startSharding() + withClue( + s"Failed waiting for ${cluster.selfUniqueAddress} to be up. Current state: ${cluster.state}" + cluster.state) { + within(15.seconds) { + awaitAssert(cluster.state.members.exists { m => + m.uniqueAddress == cluster.selfUniqueAddress && m.status == MemberStatus.Up + } should be(true)) + } } - } - } - enterBarrier(from.name + "-joined") + }) } def startSharding(): Unit = { - ClusterSharding(system).start( + startSharding( + system, typeName = "Entity", entityProps = Props[Entity](), - settings = ClusterShardingSettings(system), extractEntityId = extractEntityId, extractShardId = extractShardId) } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingConfig.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingConfig.scala index 1ec0f286ef..f074f56467 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingConfig.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingConfig.scala @@ -12,6 +12,7 @@ import akka.remote.testkit.MultiNodeConfig import com.typesafe.config.{ Config, ConfigFactory } object MultiNodeClusterShardingConfig { + private[sharding] def testNameFromCallStack(classToStartFrom: Class[_]): String = { def isAbstractClass(className: String): Boolean = { @@ -53,6 +54,21 @@ object MultiNodeClusterShardingConfig { .replaceAll("""\$\$?\w+""", "") // drop scala anonymous functions/classes .replaceAll("[^a-zA-Z_0-9]", "_") } + + def persistenceConfig(targetDir: String): Config = + ConfigFactory.parseString(s""" + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" + akka.persistence.journal.leveldb-shared { + timeout = 5s + store { + native = off + dir = "$targetDir/journal" + } + } + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "$targetDir/snapshots" + """) + } /** @@ -63,48 +79,42 @@ object MultiNodeClusterShardingConfig { * * @param mode the state store mode * @param rememberEntities defaults to off - * @param overrides additional config + * @param additionalConfig additional config * @param loglevel defaults to INFO */ abstract class MultiNodeClusterShardingConfig( val mode: String = ClusterShardingSettings.StateStoreModeDData, val rememberEntities: Boolean = false, - overrides: Config = ConfigFactory.empty, + additionalConfig: String = "", loglevel: String = "INFO") extends MultiNodeConfig { import MultiNodeClusterShardingConfig._ val targetDir = - s"target/ClusterSharding${testNameFromCallStack(classOf[MultiNodeClusterShardingConfig])}Spec-$mode-remember-$rememberEntities" + s"target/ClusterSharding${testNameFromCallStack(classOf[MultiNodeClusterShardingConfig]).replace("Config", "").replace("_", "")}" - val modeConfig = + val persistenceConfig: Config = if (mode == ClusterShardingSettings.StateStoreModeDData) ConfigFactory.empty - else ConfigFactory.parseString(s""" - akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" - akka.persistence.journal.leveldb-shared.timeout = 5s - akka.persistence.journal.leveldb-shared.store.native = off - akka.persistence.journal.leveldb-shared.store.dir = "$targetDir/journal" - akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "$targetDir/snapshots" - """) + else MultiNodeClusterShardingConfig.persistenceConfig(targetDir) - commonConfig( - overrides - .withFallback(modeConfig) - .withFallback(ConfigFactory.parseString(s""" - akka.loglevel = $loglevel - akka.actor.provider = "cluster" - akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning - akka.cluster.testkit.auto-down-unreachable-after = 0s - akka.remote.log-remote-lifecycle-events = off - akka.cluster.sharding.state-store-mode = "$mode" - akka.cluster.sharding.distributed-data.durable.lmdb { - dir = $targetDir/sharding-ddata - map-size = 10 MiB - } - """)) + val common: Config = + ConfigFactory + .parseString(s""" + akka.actor.provider = "cluster" + akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning + akka.cluster.testkit.auto-down-unreachable-after = 0s + akka.cluster.sharding.state-store-mode = "$mode" + akka.cluster.sharding.distributed-data.durable.lmdb { + dir = $targetDir/sharding-ddata + map-size = 10 MiB + } + akka.loglevel = $loglevel + akka.remote.log-remote-lifecycle-events = off + """) .withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest) - .withFallback(MultiNodeClusterSpec.clusterConfig)) + .withFallback(MultiNodeClusterSpec.clusterConfig) + + commonConfig(ConfigFactory.parseString(additionalConfig).withFallback(persistenceConfig).withFallback(common)) } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala index 30deb8d2a3..dceebb54d5 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala @@ -6,38 +6,66 @@ package akka.cluster.sharding import java.io.File -import scala.concurrent.duration._ - -import akka.actor.{ Actor, ActorIdentity, ActorRef, ActorSystem, Identify, PoisonPill, Props } -import akka.cluster.{ Cluster, MultiNodeClusterSpec } +import akka.actor.{ Actor, ActorIdentity, ActorLogging, ActorRef, ActorSystem, Identify, PoisonPill, Props } +import akka.cluster.MultiNodeClusterSpec +import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.persistence.Persistence import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore } import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeSpec -import akka.testkit.TestProbe -import akka.util.ccompat.ccompatUsedUntil213 +import akka.serialization.jackson.CborSerializable +import akka.testkit.{ TestActors, TestProbe } +import akka.util.ccompat._ import org.apache.commons.io.FileUtils +import scala.concurrent.duration._ + @ccompatUsedUntil213 object MultiNodeClusterShardingSpec { - final case class EntityStarted(ref: ActorRef) - - def props(probe: ActorRef): Props = Props(new EntityActor(probe)) + object EntityActor { + final case class Started(ref: ActorRef) + } class EntityActor(probe: ActorRef) extends Actor { - probe ! EntityStarted(self) + probe ! EntityActor.Started(self) def receive: Receive = { case m => sender() ! m } } - val defaultExtractEntityId: ShardRegion.ExtractEntityId = { - case id: Int => (id.toString, id) + object PingPongActor { + case object Stop extends CborSerializable + case class Ping(id: Long) extends CborSerializable + case object Pong extends CborSerializable } - val defaultExtractShardId: ShardRegion.ExtractShardId = msg => + class PingPongActor extends Actor with ActorLogging { + import PingPongActor._ + log.info(s"entity started {}", self.path) + def receive: Receive = { + case Stop => context.stop(self) + case _: Ping => sender() ! Pong + } + } + + object ShardedEntity { + case object Stop + } + + class ShardedEntity extends Actor { + def receive: Receive = { + case id: Int => sender() ! id + case ShardedEntity.Stop => + context.stop(self) + } + } + + val intExtractEntityId: ShardRegion.ExtractEntityId = { + case id: Int => (id.toString, id) + } + val intExtractShardId: ShardRegion.ExtractShardId = msg => msg match { case id: Int => id.toString case ShardRegion.StartEntity(id) => id @@ -58,7 +86,12 @@ abstract class MultiNodeClusterShardingSpec(val config: MultiNodeClusterSharding override def initialParticipants: Int = roles.size - protected val storageLocations = List( + protected lazy val settings = ClusterShardingSettings(system).withRememberEntities(config.rememberEntities) + + private lazy val defaultShardAllocationStrategy = + ClusterSharding(system).defaultShardAllocationStrategy(settings) + + protected lazy val storageLocations = List( new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) override protected def atStartup(): Unit = { @@ -72,43 +105,91 @@ abstract class MultiNodeClusterShardingSpec(val config: MultiNodeClusterSharding super.afterTermination() } - protected def join(from: RoleName, to: RoleName): Unit = { + /** + * Flexible cluster join pattern usage. + * + * @param from the node the `Cluster.join` is `runOn` + * @param to the node to join to + * @param onJoinedRunOnFrom optionally execute a function after join validation + * is successful, e.g. start sharding or create coordinator + * @param assertNodeUp if disabled - false, the joining member's `MemberStatus.Up` + * and similar assertions are not run. This allows tests that were + * not doing assertions (e.g. ClusterShardingMinMembersSpec) or + * doing them after `onJoinedRunOnFrom` more flexibility. + * Defaults to true, running member status checks. + */ + protected def join( + from: RoleName, + to: RoleName, + onJoinedRunOnFrom: => Unit = (), + assertNodeUp: Boolean = true, + max: FiniteDuration = 20.seconds): Unit = { + runOn(from) { - Cluster(system).join(node(to).address) - awaitAssert { - Cluster(system).state.isMemberUp(node(from).address) + cluster.join(node(to).address) + if (assertNodeUp) { + within(max) { + awaitAssert { + cluster.state.isMemberUp(node(from).address) + } + } } + onJoinedRunOnFrom } enterBarrier(from.name + "-joined") } protected def startSharding( sys: ActorSystem, - entityProps: Props, - dataType: String, - extractEntityId: ShardRegion.ExtractEntityId = defaultExtractEntityId, - extractShardId: ShardRegion.ExtractShardId = defaultExtractShardId, + typeName: String, + entityProps: Props = TestActors.echoActorProps, + settings: ClusterShardingSettings = settings, + extractEntityId: ShardRegion.ExtractEntityId = intExtractEntityId, + extractShardId: ShardRegion.ExtractShardId = intExtractShardId, + allocationStrategy: ShardAllocationStrategy = defaultShardAllocationStrategy, handOffStopMessage: Any = PoisonPill): ActorRef = { ClusterSharding(sys).start( - typeName = dataType, - entityProps = entityProps, - settings = ClusterShardingSettings(sys).withRememberEntities(rememberEntities), - extractEntityId = extractEntityId, - extractShardId = extractShardId, - ClusterSharding(sys).defaultShardAllocationStrategy(ClusterShardingSettings(sys)), + typeName, + entityProps, + settings, + extractEntityId, + extractShardId, + allocationStrategy, handOffStopMessage) } + protected def startProxy( + sys: ActorSystem, + typeName: String, + role: Option[String], + extractEntityId: ShardRegion.ExtractEntityId, + extractShardId: ShardRegion.ExtractShardId): ActorRef = { + ClusterSharding(sys).startProxy(typeName, role, extractEntityId, extractShardId) + } + protected def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData - private def setStoreIfNotDdataMode(sys: ActorSystem, storeOn: RoleName): Unit = - if (!isDdataMode) { - val probe = TestProbe()(sys) - sys.actorSelection(node(storeOn) / "user" / "store").tell(Identify(None), probe.ref) - val sharedStore = probe.expectMsgType[ActorIdentity](20.seconds).ref.get - SharedLeveldbJournal.setStore(sharedStore, sys) - } + protected def setStoreIfNotDdataMode(sys: ActorSystem, storeOn: RoleName): Unit = + if (!isDdataMode) setStore(sys, storeOn) + + protected def setStore(sys: ActorSystem, storeOn: RoleName): Unit = { + val probe = TestProbe()(sys) + sys.actorSelection(node(storeOn) / "user" / "store").tell(Identify(None), probe.ref) + val sharedStore = probe.expectMsgType[ActorIdentity](20.seconds).ref.get + SharedLeveldbJournal.setStore(sharedStore, sys) + } + + /** + * {{{ + * startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(first, second, third)) + * }}} + * + * @param startOn the node to start the `SharedLeveldbStore` store on + * @param setStoreOn the nodes to `SharedLeveldbJournal.setStore` on + */ + protected def startPersistenceIfNotDdataMode(startOn: RoleName, setStoreOn: Seq[RoleName]): Unit = + if (!isDdataMode) startPersistence(startOn, setStoreOn) /** * {{{ @@ -118,21 +199,20 @@ abstract class MultiNodeClusterShardingSpec(val config: MultiNodeClusterSharding * @param startOn the node to start the `SharedLeveldbStore` store on * @param setStoreOn the nodes to `SharedLeveldbJournal.setStore` on */ - protected def startPersistenceIfNotDdataMode(startOn: RoleName, setStoreOn: Seq[RoleName]): Unit = - if (!isDdataMode) { + protected def startPersistence(startOn: RoleName, setStoreOn: Seq[RoleName]): Unit = { + info("Setting up setup shared journal.") - Persistence(system) - runOn(startOn) { - system.actorOf(Props[SharedLeveldbStore], "store") - } - enterBarrier("persistence-started") - - runOn(setStoreOn: _*) { - setStoreIfNotDdataMode(system, startOn) - } - - enterBarrier(s"after-${startOn.name}") + Persistence(system) + runOn(startOn) { + system.actorOf(Props[SharedLeveldbStore], "store") + } + enterBarrier("persistence-started") + runOn(setStoreOn: _*) { + setStore(system, startOn) } + enterBarrier(s"after-${startOn.name}") + } + } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 2f14fcf0e3..57fb4693cc 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -98,7 +98,7 @@ object MultiNodeClusterSpec { trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoroner { self: MultiNodeSpec => - override def initialParticipants = roles.size + override def initialParticipants: Int = roles.size private val cachedAddresses = new ConcurrentHashMap[RoleName, Address]