diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala new file mode 100644 index 0000000000..177211b701 --- /dev/null +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala @@ -0,0 +1,111 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.cluster.sharding.typed + +import akka.actor.typed.{ ActorRef, Props } +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.typed.{ MultiDcClusterActors, MultiNodeTypedClusterSpec } +import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec } +import akka.testkit.typed.scaladsl.TestProbe +import akka.util.Timeout +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.ScalaFutures + +object MultiDcClusterShardingSpecConfig extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + commonConfig( + ConfigFactory.parseString( + """ + akka.loglevel = DEBUG + """).withFallback( + MultiNodeTypedClusterSpec.clusterConfig)) + + nodeConfig(first, second)(ConfigFactory.parseString( + """ + akka.cluster.multi-data-center.self-data-center = "dc1" + """)) + + nodeConfig(third, fourth)(ConfigFactory.parseString( + """ + akka.cluster.multi-data-center.self-data-center = "dc2" + """)) + + testTransport(on = true) +} + +class MultiDcClusterShardingMultiJvmNode1 extends MultiDcClusterShardingSpec +class MultiDcClusterShardingMultiJvmNode2 extends MultiDcClusterShardingSpec +class MultiDcClusterShardingMultiJvmNode3 extends MultiDcClusterShardingSpec +class MultiDcClusterShardingMultiJvmNode4 extends MultiDcClusterShardingSpec + +abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterShardingSpecConfig) + with MultiNodeTypedClusterSpec with ScalaFutures { + + import MultiDcClusterShardingSpecConfig._ + import MultiDcClusterActors._ + + val typeKey = EntityTypeKey[PingProtocol]("ping") + val entityId = "ping-1" + + "Cluster sharding in multi dc cluster" must { + "form cluster" in { + formCluster(first, second, third, fourth) + } + + "start sharding" in { + val sharding = ClusterSharding(typedSystem) + val shardRegion: ActorRef[ShardingEnvelope[PingProtocol]] = sharding.spawn( + _ ⇒ multiDcPinger, + Props.empty, + typeKey = typeKey, + ClusterShardingSettings(typedSystem), + 10, + NoMore + ) + val probe = TestProbe[Pong] + shardRegion ! ShardingEnvelope(entityId, Ping(probe.ref)) + probe.expectMessage(Pong(cluster.selfMember.dataCenter)) + enterBarrier("sharding-started") + } + + "be able to message via entity ref" in { + val probe = TestProbe[Pong] + val entityRef = ClusterSharding(typedSystem).entityRefFor(typeKey, entityId) + entityRef ! Ping(probe.ref) + probe.expectMessage(Pong(cluster.selfMember.dataCenter)) + enterBarrier("entity-ref") + } + } + + "be able to ask via entity ref" in { + implicit val timeout = Timeout(remainingOrDefault) + val entityRef = ClusterSharding(typedSystem).entityRefFor(typeKey, entityId) + val response = entityRef ? Ping + response.futureValue shouldEqual Pong(cluster.selfMember.dataCenter) + enterBarrier("ask") + } + + "be able to message cross dc via proxy" in { + runOn(first, second) { + val proxy: ActorRef[ShardingEnvelope[PingProtocol]] = ClusterSharding(typedSystem).spawn( + _ ⇒ multiDcPinger, + Props.empty, + typeKey = typeKey, + ClusterShardingSettings(typedSystem).withDataCenter("dc2"), + 10, + NoMore + ) + val probe = TestProbe[Pong] + proxy ! ShardingEnvelope(entityId, Ping(probe.ref)) + probe.expectMessage(remainingOrDefault, Pong("dc2")) + } + enterBarrier("done") + } +} diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 504457117c..564f9812f3 100755 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -168,7 +168,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { private val regions: ConcurrentHashMap[String, ActorRef] = new ConcurrentHashMap private val proxies: ConcurrentHashMap[String, ActorRef] = new ConcurrentHashMap - private lazy val guardian = { + private lazy val guardian: ActorRef = { val guardianName: String = system.settings.config.getString("akka.cluster.sharding.guardian-name") val dispatcher = system.settings.config @@ -610,7 +610,7 @@ private[akka] class ClusterShardingGuardian extends Actor { context.system.deadLetters } - def receive = { + def receive: Receive = { case Start(typeName, entityProps, settings, diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index fffea36926..d50096ee69 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -323,9 +323,6 @@ object ShardRegion { */ final case class StartEntityAck(entityId: EntityId, shardId: ShardRegion.ShardId) extends ClusterShardingSerializable - private def roleOption(role: String): Option[String] = - if (role == "") None else Option(role) - /** * INTERNAL API. Sends stopMessage (e.g. `PoisonPill`) to the entities and when all of * them have terminated it replies with `ShardStopped`. @@ -461,7 +458,7 @@ private[akka] class ShardRegion( } } - def receive = { + def receive: Receive = { case Terminated(ref) ⇒ receiveTerminated(ref) case ShardInitialized(shardId) ⇒ initializeShard(shardId, sender()) case evt: ClusterDomainEvent ⇒ receiveClusterEvent(evt) @@ -759,7 +756,7 @@ private[akka] class ShardRegion( getShard(shardId) case None ⇒ if (!shardBuffers.contains(shardId)) { - log.debug("Request shard [{}] home", shardId) + log.debug("Request shard [{}] home. Coordinator [{}]", shardId, coordinator) coordinator.foreach(_ ! GetShardHome(shardId)) } val buf = shardBuffers.getOrEmpty(shardId) @@ -788,7 +785,7 @@ private[akka] class ShardRegion( context.system.deadLetters ! msg case None ⇒ if (!shardBuffers.contains(shardId)) { - log.debug("Request shard [{}] home", shardId) + log.debug("Request shard [{}] home. Coordinator [{}]", shardId, coordinator) coordinator.foreach(_ ! GetShardHome(shardId)) } bufferMessage(shardId, msg, snd) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiDcClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiDcClusterShardingSpec.scala index c61ca6fbfc..062d5aee86 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiDcClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiDcClusterShardingSpec.scala @@ -123,7 +123,7 @@ abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterSh }, 10.seconds) } - s"Cluster sharding in multi data center cluster" must { + "Cluster sharding in multi data center cluster" must { "join cluster" in within(20.seconds) { join(first, first) join(second, first) diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala index 31b61ea01f..bed25c848e 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala @@ -97,6 +97,8 @@ final class ClusterSingletonProxySettings( def withDataCenter(dataCenter: DataCenter): ClusterSingletonProxySettings = copy(dataCenter = Some(dataCenter)) + def withDataCenter(dataCenter: Option[DataCenter]): ClusterSingletonProxySettings = copy(dataCenter = dataCenter) + def withSingletonIdentificationInterval(singletonIdentificationInterval: FiniteDuration): ClusterSingletonProxySettings = copy(singletonIdentificationInterval = singletonIdentificationInterval) diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala index b4ddd84f91..2a37da38fd 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala @@ -5,29 +5,24 @@ package akka.cluster.singleton import language.postfixOps -import scala.collection.immutable import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorRef -import akka.actor.Address import akka.actor.Props import akka.actor.RootActorPath import akka.cluster.Cluster import akka.cluster.ClusterEvent._ -import akka.cluster.Member import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.STMultiNodeSpec import akka.testkit._ import akka.testkit.TestEvent._ -import akka.actor.Terminated import akka.actor.Identify import akka.actor.ActorIdentity import akka.actor.ActorSelection -import akka.cluster.MemberStatus object ClusterSingletonManagerSpec extends MultiNodeConfig { val controller = role("controller") diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala index a95aa1945a..1036337750 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala @@ -88,17 +88,21 @@ final class ClusterSingletonSettings( * INTERNAL API: */ @InternalApi - private[akka] def toProxySettings(singletonName: String): ClusterSingletonProxySettings = + private[akka] def toProxySettings(singletonName: String): ClusterSingletonProxySettings = { new ClusterSingletonProxySettings(singletonName, role, singletonIdentificationInterval, bufferSize) + .withDataCenter(dataCenter) + } /** * INTERNAL API: */ @InternalApi - private[akka] def shouldRunManager(cluster: Cluster): Boolean = + private[akka] def shouldRunManager(cluster: Cluster): Boolean = { (role.isEmpty || cluster.selfMember.roles(role.get)) && (dataCenter.isEmpty || dataCenter.contains(cluster.selfMember.dataCenter)) + } + override def toString = s"ClusterSingletonSettings($role, $dataCenter, $singletonIdentificationInterval, $removalMargin, $handOverRetryInterval, $bufferSize)" } object ClusterSingleton extends ExtensionId[ClusterSingleton] { @@ -141,7 +145,6 @@ abstract class ClusterSingleton extends Extension { settings: ClusterSingletonSettings, terminationMessage: A ): ActorRef[A] - } object ClusterSingletonManagerSettings { diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterSingletonImpl.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterSingletonImpl.scala index 0dae56b44d..72baf03838 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterSingletonImpl.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterSingletonImpl.scala @@ -14,6 +14,7 @@ import akka.actor.typed.Behavior.UntypedPropsBehavior import akka.cluster.typed.{ Cluster, ClusterSingleton, ClusterSingletonImpl, ClusterSingletonSettings } import akka.actor.typed.internal.adapter.ActorSystemAdapter import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props } +import akka.cluster.ClusterSettings.DataCenter /** * INTERNAL API: @@ -21,20 +22,21 @@ import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props } @InternalApi private[akka] final class AdaptedClusterSingletonImpl(system: ActorSystem[_]) extends ClusterSingleton { require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted actor systems can be used for the typed cluster singleton") + import ClusterSingletonImpl._ import akka.actor.typed.scaladsl.adapter._ private lazy val cluster = Cluster(system) private val untypedSystem = system.toUntyped.asInstanceOf[ExtendedActorSystem] - private val proxies = new ConcurrentHashMap[String, ActorRef[_]]() + private val proxies = new ConcurrentHashMap[(String, Option[DataCenter]), ActorRef[_]]() override def spawn[A]( behavior: Behavior[A], singletonName: String, props: Props, settings: ClusterSingletonSettings, - terminationMessage: A) = { + terminationMessage: A): ActorRef[A] = { if (settings.shouldRunManager(cluster)) { val managerName = managerNameFor(singletonName) @@ -53,15 +55,19 @@ private[akka] final class AdaptedClusterSingletonImpl(system: ActorSystem[_]) ex } } - val proxyCreator = new JFunction[String, ActorRef[_]] { - def apply(singletonName: String): ActorRef[_] = { - val proxyName = s"singletonProxy$singletonName" + getProxy(singletonName, settings) + } + + private def getProxy[T](name: String, settings: ClusterSingletonSettings): ActorRef[T] = { + val proxyCreator = new JFunction[(String, Option[DataCenter]), ActorRef[_]] { + def apply(singletonNameAndDc: (String, Option[DataCenter])): ActorRef[_] = { + val (singletonName, _) = singletonNameAndDc + val proxyName = s"singletonProxy$singletonName-${settings.dataCenter.getOrElse("no-dc")}" untypedSystem.systemActorOf( ClusterSingletonProxy.props(s"/system/${managerNameFor(singletonName)}", settings.toProxySettings(singletonName)), proxyName) } } - - proxies.computeIfAbsent(singletonName, proxyCreator).asInstanceOf[ActorRef[A]] + proxies.computeIfAbsent((name, settings.dataCenter), proxyCreator).asInstanceOf[ActorRef[T]] } } diff --git a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiDcClusterActors.scala b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiDcClusterActors.scala new file mode 100644 index 0000000000..67317faebb --- /dev/null +++ b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiDcClusterActors.scala @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.cluster.typed + +import akka.actor.typed.ActorRef +import akka.actor.typed.scaladsl.Behaviors + +object MultiDcClusterActors { + case class Pong(dc: String) + sealed trait PingProtocol + case class Ping(ref: ActorRef[Pong]) extends PingProtocol + case object NoMore extends PingProtocol + + val multiDcPinger = Behaviors.setup[PingProtocol] { ctx ⇒ + val cluster = Cluster(ctx.system) + Behaviors.receiveMessage[PingProtocol] { + case Ping(ref) ⇒ + ref ! Pong(cluster.selfMember.dataCenter) + Behaviors.same + case NoMore ⇒ + Behaviors.stopped + } + } +} diff --git a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiDcClusterSingletonSpec.scala b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiDcClusterSingletonSpec.scala new file mode 100644 index 0000000000..d5a736d3a3 --- /dev/null +++ b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiDcClusterSingletonSpec.scala @@ -0,0 +1,126 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.cluster.typed + +import akka.actor.typed.Props +import akka.actor.typed.scaladsl.adapter._ +import akka.cluster.MemberStatus +import akka.remote.testconductor.RoleName +import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec } +import akka.testkit.typed.scaladsl.TestProbe +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ + +object MultiDcClusterSingletonSpecConfig extends MultiNodeConfig { + val first: RoleName = role("first") + val second = role("second") + val third = role("third") + + commonConfig( + ConfigFactory.parseString( + """ + akka.loglevel = DEBUG + """).withFallback( + MultiNodeTypedClusterSpec.clusterConfig)) + + nodeConfig(first)(ConfigFactory.parseString( + """ + akka.cluster.multi-data-center.self-data-center = "dc1" + """)) + + nodeConfig(second, third)(ConfigFactory.parseString( + """ + akka.cluster.multi-data-center.self-data-center = "dc2" + """)) + + testTransport(on = true) +} + +class MultiDcClusterSingletonMultiJvmNode1 extends MultiDcClusterSingletonSpec +class MultiDcClusterSingletonMultiJvmNode2 extends MultiDcClusterSingletonSpec +class MultiDcClusterSingletonMultiJvmNode3 extends MultiDcClusterSingletonSpec + +abstract class MultiDcClusterSingletonSpec extends MultiNodeSpec(MultiDcClusterSingletonSpecConfig) + with MultiNodeTypedClusterSpec { + + import MultiDcClusterActors._ + import MultiDcClusterSingletonSpecConfig._ + + "A typed cluster with multiple data centers" must { + "be able to form" in { + runOn(first) { + cluster.manager ! Join(cluster.selfMember.address) + } + runOn(second, third) { + cluster.manager ! Join(first) + } + enterBarrier("form-cluster-join-attempt") + runOn(first, second, third) { + within(20.seconds) { + awaitAssert(clusterView.members.filter(_.status == MemberStatus.Up) should have size 3) + } + } + enterBarrier("cluster started") + } + + "be able to create and ping singleton in same DC" in { + runOn(first) { + val singleton = ClusterSingleton(typedSystem) + val pinger = singleton.spawn( + multiDcPinger, + "ping", + Props.empty, + ClusterSingletonSettings(typedSystem), + NoMore + ) + val probe = TestProbe[Pong] + pinger ! Ping(probe.ref) + probe.expectMessage(Pong("dc1")) + enterBarrier("singleton-up") + } + runOn(second, third) { + enterBarrier("singleton-up") + } + } + + "be able to ping singleton via proxy in another dc" in { + runOn(second) { + val singleton = ClusterSingleton(system.toTyped) + val pinger = singleton.spawn( + multiDcPinger, + "ping", + Props.empty, + ClusterSingletonSettings(typedSystem).withDataCenter("dc1"), + NoMore + ) + val probe = TestProbe[Pong] + pinger ! Ping(probe.ref) + probe.expectMessage(Pong("dc1")) + } + + enterBarrier("singleton-pinged") + } + + "be able to target singleton with the same name in own dc " in { + runOn(second, third) { + val singleton = ClusterSingleton(typedSystem) + val pinger = singleton.spawn( + multiDcPinger, + "ping", + Props.empty, + ClusterSingletonSettings(typedSystem), + NoMore + ) + val probe = TestProbe[Pong] + pinger ! Ping(probe.ref) + probe.expectMessage(Pong("dc2")) + } + + enterBarrier("singleton-pinged-own-dc") + } + } +} + diff --git a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiNodeTypedClusterSpec.scala b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiNodeTypedClusterSpec.scala new file mode 100644 index 0000000000..717ca4aa1f --- /dev/null +++ b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiNodeTypedClusterSpec.scala @@ -0,0 +1,114 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.cluster.typed + +import java.util.UUID +import java.util.concurrent.ConcurrentHashMap + +import akka.actor.{ Address, Scheduler } +import akka.actor.typed.ActorSystem +import akka.remote.testkit.{ FlightRecordingSupport, MultiNodeSpec, STMultiNodeSpec } +import akka.testkit.WatchedByCoroner +import org.scalatest.{ Matchers, Suite } +import akka.actor.typed.scaladsl.adapter._ +import akka.cluster.{ ClusterEvent, MemberStatus } +import akka.remote.testconductor.RoleName +import com.typesafe.config.{ Config, ConfigFactory } + +import scala.concurrent.duration._ +import scala.language.implicitConversions + +object MultiNodeTypedClusterSpec { + def clusterConfig: Config = ConfigFactory.parseString( + s""" + akka.actor.provider = cluster + akka.actor.warn-about-java-serializer-usage = off + akka.cluster { + jmx.enabled = off + gossip-interval = 200 ms + leader-actions-interval = 200 ms + unreachable-nodes-reaper-interval = 500 ms + periodic-tasks-initial-delay = 300 ms + publish-stats-interval = 0 s # always, when it happens + failure-detector.heartbeat-interval = 500 ms + + run-coordinated-shutdown-when-down = off + } + akka.loglevel = INFO + akka.log-dead-letters = off + akka.log-dead-letters-during-shutdown = off + akka.remote { + log-remote-lifecycle-events = off + artery.advanced.flight-recorder { + enabled=on + destination=target/flight-recorder-${UUID.randomUUID().toString}.afr + } + } + akka.loggers = ["akka.testkit.TestEventListener"] + akka.test { + single-expect-default = 10 s + } + + """) + +} + +trait MultiNodeTypedClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoroner with FlightRecordingSupport with Matchers { + self: MultiNodeSpec ⇒ + + override def initialParticipants: Int = roles.size + + implicit def typedSystem: ActorSystem[Nothing] = system.toTyped + implicit def scheduler: Scheduler = system.scheduler + + private val cachedAddresses = new ConcurrentHashMap[RoleName, Address] + + // TODO: Add support for typed to multi node test kit + def cluster: Cluster = Cluster(system.toTyped) + + def clusterView: ClusterEvent.CurrentClusterState = cluster.state + + override def expectedTestDuration: FiniteDuration = 60.seconds + + /** + * Lookup the Address for the role. + * + * Implicit conversion from RoleName to Address. + * + * It is cached, which has the implication that stopping + * and then restarting a role (jvm) with another address is not + * supported. + */ + implicit def address(role: RoleName): Address = { + cachedAddresses.get(role) match { + case null ⇒ + val address = node(role).address + cachedAddresses.put(role, address) + address + case address ⇒ address + } + } + + def formCluster(first: RoleName, rest: RoleName*): Unit = { + runOn(first) { + cluster.manager ! Join(cluster.selfMember.address) + awaitAssert(cluster.state.members.exists { m ⇒ + m.uniqueAddress == cluster.selfMember.uniqueAddress && m.status == MemberStatus.Up + } should be(true)) + } + enterBarrier(first.name + "-joined") + + rest foreach { node ⇒ + runOn(node) { + cluster.manager ! Join(address(first)) + awaitAssert(cluster.state.members.exists { m ⇒ + m.uniqueAddress == cluster.selfMember.uniqueAddress && m.status == MemberStatus.Up + } should be(true)) + } + } + enterBarrier("all-joined") + } + +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 259d6a6ec0..5bd0bd0827 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -76,7 +76,7 @@ object MultiNodeClusterSpec { class EndActor(testActor: ActorRef, target: Option[Address]) extends Actor { import EndActor._ - def receive = { + def receive: Receive = { case SendEnd ⇒ target foreach { t ⇒ context.actorSelection(RootActorPath(t) / self.path.elements) ! End diff --git a/build.sbt b/build.sbt index 7b9dc86336..12033db6cc 100644 --- a/build.sbt +++ b/build.sbt @@ -414,26 +414,32 @@ lazy val clusterTyped = akkaModule("akka-cluster-typed") persistenceTyped % "test->test", protobuf, typedTestkit % "test->test", - actorTypedTests % "test->test" + actorTypedTests % "test->test", + remoteTests % "test->test" ) .settings(AkkaBuild.mayChangeSettings) .settings(AutomaticModuleName.settings("akka.cluster.typed")) .disablePlugins(MimaPlugin) + .configs(MultiJvm) + .enablePlugins(MultiNodeScalaTest) lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed") .dependsOn( - clusterTyped, + clusterTyped % "compile->compile;test->test;multi-jvm->multi-jvm", persistenceTyped, clusterSharding, typedTestkit % "test->test", actorTypedTests % "test->test", - persistenceTyped % "test->test" + persistenceTyped % "test->test", + remoteTests % "test->test" ) .settings(AkkaBuild.mayChangeSettings) .settings(AutomaticModuleName.settings("akka.cluster.sharding.typed")) // To be able to import ContainerFormats.proto .settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "akka-remote" / "src" / "main" / "protobuf" )) .disablePlugins(MimaPlugin) + .configs(MultiJvm) + .enablePlugins(MultiNodeScalaTest) lazy val streamTyped = akkaModule("akka-stream-typed") .dependsOn(