diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 4779a79e48..5f9678d799 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -30,6 +30,7 @@ import akka.cluster.ddata.Replicator import scala.util.control.NonFatal import akka.actor.Status import akka.cluster.ClusterSettings +import akka.cluster.ClusterSettings.DataCenter /** * This extension provides sharding functionality of actors in a cluster. @@ -343,7 +344,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { role: Option[String], extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId): ActorRef = - startProxy(typeName, role, team = None, extractEntityId, extractShardId) + startProxy(typeName, role, dataCenter = None, extractEntityId, extractShardId) /** * Scala API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode, @@ -357,8 +358,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * @param typeName the name of the entity type * @param role specifies that this entity type is located on cluster nodes with a specific role. * If the role is not specified all nodes in the cluster are used. - * @param team The team of the cluster nodes where the cluster sharding is running. - * If None then the same team as current node. + * @param dataCenter The data center of the cluster nodes where the cluster sharding is running. + * If None then the same data center as current node. * @param extractEntityId partial function to extract the entity id and the message to send to the * entity from the incoming message, if the partial function does not match the message will * be `unhandled`, i.e. posted as `Unhandled` messages on the event stream @@ -369,21 +370,21 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { def startProxy( typeName: String, role: Option[String], - team: Option[String], + dataCenter: Option[DataCenter], extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId): ActorRef = { implicit val timeout = system.settings.CreationTimeout val settings = ClusterShardingSettings(system).withRole(role) - val startMsg = StartProxy(typeName, team, settings, extractEntityId, extractShardId) + val startMsg = StartProxy(typeName, dataCenter, settings, extractEntityId, extractShardId) val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) - // it must be possible to start several proxies, one per team - regions.put(proxyName(typeName, team), shardRegion) + // it must be possible to start several proxies, one per data center + regions.put(proxyName(typeName, dataCenter), shardRegion) shardRegion } - private def proxyName(typeName: String, team: Option[String]): String = { - team match { + private def proxyName(typeName: String, dataCenter: Option[DataCenter]): String = { + dataCenter match { case None ⇒ typeName case Some(t) ⇒ typeName + "-" + t } @@ -409,7 +410,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { typeName: String, role: Optional[String], messageExtractor: ShardRegion.MessageExtractor): ActorRef = - startProxy(typeName, role, team = Optional.empty(), messageExtractor) + startProxy(typeName, role, dataCenter = Optional.empty(), messageExtractor) /** * Java/Scala API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode, @@ -423,8 +424,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * @param typeName the name of the entity type * @param role specifies that this entity type is located on cluster nodes with a specific role. * If the role is not specified all nodes in the cluster are used. - * @param team The team of the cluster nodes where the cluster sharding is running. - * If None then the same team as current node. + * @param dataCenter The data center of the cluster nodes where the cluster sharding is running. + * If None then the same data center as current node. * @param messageExtractor functions to extract the entity id, shard id, and the message to send to the * entity from the incoming message * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard @@ -432,10 +433,10 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { def startProxy( typeName: String, role: Optional[String], - team: Optional[String], + dataCenter: Optional[String], messageExtractor: ShardRegion.MessageExtractor): ActorRef = { - startProxy(typeName, Option(role.orElse(null)), Option(team.orElse(null)), + startProxy(typeName, Option(role.orElse(null)), Option(dataCenter.orElse(null)), extractEntityId = { case msg if messageExtractor.entityId(msg) ne null ⇒ (messageExtractor.entityId(msg), messageExtractor.entityMessage(msg)) @@ -456,13 +457,13 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { /** * Retrieve the actor reference of the [[ShardRegion]] actor that will act as a proxy to the - * named entity type running in another team. A proxy within the same team can be accessed + * named entity type running in another data center. A proxy within the same data center can be accessed * with [[#shardRegion]] instead of this method. The entity type must be registered with the * [[#startProxy]] method before it can be used here. Messages to the entity is always sent * via the `ShardRegion`. */ - def shardRegionProxy(typeName: String, team: String): ActorRef = { - regions.get(proxyName(typeName, Some(team))) match { + def shardRegionProxy(typeName: String, dataCenter: DataCenter): ActorRef = { + regions.get(proxyName(typeName, Some(dataCenter))) match { case null ⇒ throw new IllegalArgumentException(s"Shard type [$typeName] must be started first") case ref ⇒ ref } @@ -479,7 +480,7 @@ private[akka] object ClusterShardingGuardian { extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId, allocationStrategy: ShardAllocationStrategy, handOffStopMessage: Any) extends NoSerializationVerificationNeeded - final case class StartProxy(typeName: String, team: Option[String], settings: ClusterShardingSettings, + final case class StartProxy(typeName: String, dataCenter: Option[DataCenter], settings: ClusterShardingSettings, extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId) extends NoSerializationVerificationNeeded final case class Started(shardRegion: ActorRef) extends NoSerializationVerificationNeeded @@ -518,8 +519,8 @@ private[akka] class ClusterShardingGuardian extends Actor { case Some(r) ⇒ URLEncoder.encode(r, ByteString.UTF_8) + "Replicator" case None ⇒ "replicator" } - // Use members within the team and with the given role (if any) - val replicatorRoles = Set(ClusterSettings.TeamRolePrefix + cluster.settings.Team) ++ settings.role + // Use members within the data center and with the given role (if any) + val replicatorRoles = Set(ClusterSettings.DcRolePrefix + cluster.settings.DataCenter) ++ settings.role val ref = context.actorOf(Replicator.props(replicatorSettings.withRoles(replicatorRoles)), name) replicatorByRole = replicatorByRole.updated(settings.role, ref) ref @@ -584,14 +585,14 @@ private[akka] class ClusterShardingGuardian extends Actor { sender() ! Status.Failure(e) } - case StartProxy(typeName, team, settings, extractEntityId, extractShardId) ⇒ + case StartProxy(typeName, dataCenter, settings, extractEntityId, extractShardId) ⇒ try { val encName = URLEncoder.encode(typeName, ByteString.UTF_8) val cName = coordinatorSingletonManagerName(encName) val cPath = coordinatorPath(encName) - // it must be possible to start several proxies, one per team - val actorName = team match { + // it must be possible to start several proxies, one per data center + val actorName = dataCenter match { case None ⇒ encName case Some(t) ⇒ URLEncoder.encode(typeName + "-" + t, ByteString.UTF_8) } @@ -599,7 +600,7 @@ private[akka] class ClusterShardingGuardian extends Actor { context.actorOf( ShardRegion.proxyProps( typeName = typeName, - team = team, + dataCenter = dataCenter, settings = settings, coordinatorPath = cPath, extractEntityId = extractEntityId, diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 2d9b8eb0e4..8de68d4cd4 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -21,6 +21,7 @@ import scala.reflect.ClassTag import scala.concurrent.Promise import akka.Done import akka.cluster.ClusterSettings +import akka.cluster.ClusterSettings.DataCenter /** * @see [[ClusterSharding$ ClusterSharding extension]] @@ -41,7 +42,7 @@ object ShardRegion { handOffStopMessage: Any, replicator: ActorRef, majorityMinCap: Int): Props = - Props(new ShardRegion(typeName, Some(entityProps), team = None, settings, coordinatorPath, extractEntityId, + Props(new ShardRegion(typeName, Some(entityProps), dataCenter = None, settings, coordinatorPath, extractEntityId, extractShardId, handOffStopMessage, replicator, majorityMinCap)).withDeploy(Deploy.local) /** @@ -51,14 +52,14 @@ object ShardRegion { */ private[akka] def proxyProps( typeName: String, - team: Option[String], + dataCenter: Option[DataCenter], settings: ClusterShardingSettings, coordinatorPath: String, extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId, replicator: ActorRef, majorityMinCap: Int): Props = - Props(new ShardRegion(typeName, None, team, settings, coordinatorPath, extractEntityId, extractShardId, + Props(new ShardRegion(typeName, None, dataCenter, settings, coordinatorPath, extractEntityId, extractShardId, PoisonPill, replicator, majorityMinCap)).withDeploy(Deploy.local) /** @@ -367,7 +368,7 @@ object ShardRegion { private[akka] class ShardRegion( typeName: String, entityProps: Option[Props], - team: Option[String], + dataCenter: Option[DataCenter], settings: ClusterShardingSettings, coordinatorPath: String, extractEntityId: ShardRegion.ExtractEntityId, @@ -422,14 +423,14 @@ private[akka] class ShardRegion( retryTask.cancel() } - // when using proxy the team can be different that the own team - private val targetTeamRole = team match { - case Some(t) ⇒ ClusterSettings.TeamRolePrefix + t - case None ⇒ ClusterSettings.TeamRolePrefix + cluster.settings.Team + // when using proxy the data center can be different from the own data center + private val targetDcRole = dataCenter match { + case Some(t) ⇒ ClusterSettings.DcRolePrefix + t + case None ⇒ ClusterSettings.DcRolePrefix + cluster.settings.DataCenter } def matchingRole(member: Member): Boolean = - member.hasRole(targetTeamRole) && role.forall(member.hasRole) + member.hasRole(targetDcRole) && role.forall(member.hasRole) def coordinatorSelection: Option[ActorSelection] = membersByAge.headOption.map(m ⇒ context.actorSelection(RootActorPath(m.address) + coordinatorPath)) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index 77ae4d0c2f..be7b312764 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -460,7 +460,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu val proxy = system.actorOf( ShardRegion.proxyProps( typeName = "counter", - team = None, + dataCenter = None, settings, coordinatorPath = "/user/counterCoordinator/singleton/coordinator", extractEntityId = extractEntityId, diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/TeamClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiDcClusterShardingSpec.scala similarity index 83% rename from akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/TeamClusterShardingSpec.scala rename to akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiDcClusterShardingSpec.scala index 9710faccd4..54df6e3ebf 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/TeamClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiDcClusterShardingSpec.scala @@ -21,7 +21,7 @@ import akka.remote.testkit.STMultiNodeSpec import akka.testkit._ import com.typesafe.config.ConfigFactory -object TeamClusterShardingSpec { +object MultiDcClusterShardingSpec { sealed trait EntityMsg { def id: String } @@ -48,7 +48,7 @@ object TeamClusterShardingSpec { } } -object TeamClusterShardingSpecConfig extends MultiNodeConfig { +object MultiDcClusterShardingSpecConfig extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") @@ -62,23 +62,23 @@ object TeamClusterShardingSpecConfig extends MultiNodeConfig { """)) nodeConfig(first, second) { - ConfigFactory.parseString("akka.cluster.team = DC1") + ConfigFactory.parseString("akka.cluster.data-center = DC1") } nodeConfig(third, fourth) { - ConfigFactory.parseString("akka.cluster.team = DC2") + ConfigFactory.parseString("akka.cluster.data-center = DC2") } } -class TeamClusterShardingMultiJvmNode1 extends TeamClusterShardingSpec -class TeamClusterShardingMultiJvmNode2 extends TeamClusterShardingSpec -class TeamClusterShardingMultiJvmNode3 extends TeamClusterShardingSpec -class TeamClusterShardingMultiJvmNode4 extends TeamClusterShardingSpec +class MultiDcClusterShardingMultiJvmNode1 extends MultiDcClusterShardingSpec +class MultiDcClusterShardingMultiJvmNode2 extends MultiDcClusterShardingSpec +class MultiDcClusterShardingMultiJvmNode3 extends MultiDcClusterShardingSpec +class MultiDcClusterShardingMultiJvmNode4 extends MultiDcClusterShardingSpec -abstract class TeamClusterShardingSpec extends MultiNodeSpec(TeamClusterShardingSpecConfig) +abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterShardingSpecConfig) with STMultiNodeSpec with ImplicitSender { - import TeamClusterShardingSpec._ - import TeamClusterShardingSpecConfig._ + import MultiDcClusterShardingSpec._ + import MultiDcClusterShardingSpecConfig._ override def initialParticipants = roles.size @@ -119,7 +119,7 @@ abstract class TeamClusterShardingSpec extends MultiNodeSpec(TeamClusterSharding }, 10.seconds) } - s"Cluster sharding with teams" must { + s"Cluster sharding in multi data center cluster" must { "join cluster" in within(20.seconds) { join(first, first) join(second, first) @@ -171,7 +171,7 @@ abstract class TeamClusterShardingSpec extends MultiNodeSpec(TeamClusterSharding enterBarrier("after-2") } - "not mix entities in different teams" in { + "not mix entities in different data centers" in { runOn(second) { region ! GetCount("5") expectMsg(1) @@ -183,12 +183,12 @@ abstract class TeamClusterShardingSpec extends MultiNodeSpec(TeamClusterSharding enterBarrier("after-3") } - "allow proxy within same team" in { + "allow proxy within same data center" in { runOn(second) { val proxy = ClusterSharding(system).startProxy( typeName = "Entity", role = None, - team = None, // by default use own team + dataCenter = None, // by default use own DC extractEntityId = extractEntityId, extractShardId = extractShardId) @@ -198,12 +198,12 @@ abstract class TeamClusterShardingSpec extends MultiNodeSpec(TeamClusterSharding enterBarrier("after-4") } - "allow proxy across different teams" in { + "allow proxy across different data centers" in { runOn(second) { val proxy = ClusterSharding(system).startProxy( typeName = "Entity", role = None, - team = Some("DC2"), // proxy to other DC + dataCenter = Some("DC2"), // proxy to other DC extractEntityId = extractEntityId, extractShardId = extractShardId) diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index a0e9f1c868..535a929636 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -257,10 +257,10 @@ object ClusterSingletonManager { } override def postStop(): Unit = cluster.unsubscribe(self) - private val selfTeam = ClusterSettings.TeamRolePrefix + cluster.settings.Team + private val selfDc = ClusterSettings.DcRolePrefix + cluster.settings.DataCenter def matchingRole(member: Member): Boolean = - member.hasRole(selfTeam) && role.forall(member.hasRole) + member.hasRole(selfDc) && role.forall(member.hasRole) def trackChange(block: () ⇒ Unit): Unit = { val before = membersByAge.headOption diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala index 3e51c493f5..0c017d31dd 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala @@ -20,6 +20,7 @@ import akka.actor.NoSerializationVerificationNeeded import akka.event.Logging import akka.util.MessageBuffer import akka.cluster.ClusterSettings +import akka.cluster.ClusterSettings.DataCenter object ClusterSingletonProxySettings { @@ -64,7 +65,7 @@ object ClusterSingletonProxySettings { /** * @param singletonName The actor name of the singleton actor that is started by the [[ClusterSingletonManager]]. * @param role The role of the cluster nodes where the singleton can be deployed. If None, then any node will do. - * @param team The team of the cluster nodes where the singleton is running. If None then the same team as current node. + * @param dataCenter The data center of the cluster nodes where the singleton is running. If None then the same data center as current node. * @param singletonIdentificationInterval Interval at which the proxy will try to resolve the singleton instance. * @param bufferSize If the location of the singleton is unknown the proxy will buffer this number of messages * and deliver them when the singleton is identified. When the buffer is full old messages will be dropped @@ -74,7 +75,7 @@ object ClusterSingletonProxySettings { final class ClusterSingletonProxySettings( val singletonName: String, val role: Option[String], - val team: Option[String], + val dataCenter: Option[DataCenter], val singletonIdentificationInterval: FiniteDuration, val bufferSize: Int) extends NoSerializationVerificationNeeded { @@ -94,7 +95,7 @@ final class ClusterSingletonProxySettings( def withRole(role: Option[String]): ClusterSingletonProxySettings = copy(role = role) - def withTeam(team: String): ClusterSingletonProxySettings = copy(team = Some(team)) + def withDataCenter(dataCenter: DataCenter): ClusterSingletonProxySettings = copy(dataCenter = Some(dataCenter)) def withSingletonIdentificationInterval(singletonIdentificationInterval: FiniteDuration): ClusterSingletonProxySettings = copy(singletonIdentificationInterval = singletonIdentificationInterval) @@ -103,12 +104,12 @@ final class ClusterSingletonProxySettings( copy(bufferSize = bufferSize) private def copy( - singletonName: String = singletonName, - role: Option[String] = role, - team: Option[String] = team, - singletonIdentificationInterval: FiniteDuration = singletonIdentificationInterval, - bufferSize: Int = bufferSize): ClusterSingletonProxySettings = - new ClusterSingletonProxySettings(singletonName, role, team, singletonIdentificationInterval, bufferSize) + singletonName: String = singletonName, + role: Option[String] = role, + dataCenter: Option[DataCenter] = dataCenter, + singletonIdentificationInterval: FiniteDuration = singletonIdentificationInterval, + bufferSize: Int = bufferSize): ClusterSingletonProxySettings = + new ClusterSingletonProxySettings(singletonName, role, dataCenter, singletonIdentificationInterval, bufferSize) } object ClusterSingletonProxy { @@ -176,13 +177,13 @@ final class ClusterSingletonProxy(singletonManagerPath: String, settings: Cluste identifyTimer = None } - private val targetTeamRole = settings.team match { - case Some(t) ⇒ ClusterSettings.TeamRolePrefix + t - case None ⇒ ClusterSettings.TeamRolePrefix + cluster.settings.Team + private val targetDcRole = settings.dataCenter match { + case Some(t) ⇒ ClusterSettings.DcRolePrefix + t + case None ⇒ ClusterSettings.DcRolePrefix + cluster.settings.DataCenter } def matchingRole(member: Member): Boolean = - member.hasRole(targetTeamRole) && role.forall(member.hasRole) + member.hasRole(targetDcRole) && role.forall(member.hasRole) def handleInitial(state: CurrentClusterState): Unit = { trackChange { diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/TeamSingletonManagerSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/MultiDcSingletonManagerSpec.scala similarity index 57% rename from akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/TeamSingletonManagerSpec.scala rename to akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/MultiDcSingletonManagerSpec.scala index 4cecea2f3c..0a37bc1749 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/TeamSingletonManagerSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/MultiDcSingletonManagerSpec.scala @@ -14,7 +14,7 @@ import akka.testkit.ImplicitSender import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } import akka.cluster.ClusterSettings -object TeamSingletonManagerSpec extends MultiNodeConfig { +object MultiDcSingletonManagerSpec extends MultiNodeConfig { val controller = role("controller") val first = role("first") val second = role("second") @@ -28,44 +28,44 @@ object TeamSingletonManagerSpec extends MultiNodeConfig { nodeConfig(controller) { ConfigFactory.parseString(""" - akka.cluster.team = one + akka.cluster.data-center = one akka.cluster.roles = []""") } nodeConfig(first) { ConfigFactory.parseString(""" - akka.cluster.team = one + akka.cluster.data-center = one akka.cluster.roles = [ worker ]""") } nodeConfig(second, third) { ConfigFactory.parseString(""" - akka.cluster.team = two + akka.cluster.data-center = two akka.cluster.roles = [ worker ]""") } } -class TeamSingletonManagerMultiJvmNode1 extends TeamSingletonManagerSpec -class TeamSingletonManagerMultiJvmNode2 extends TeamSingletonManagerSpec -class TeamSingletonManagerMultiJvmNode3 extends TeamSingletonManagerSpec -class TeamSingletonManagerMultiJvmNode4 extends TeamSingletonManagerSpec +class MultiDcSingletonManagerMultiJvmNode1 extends MultiDcSingletonManagerSpec +class MultiDcSingletonManagerMultiJvmNode2 extends MultiDcSingletonManagerSpec +class MultiDcSingletonManagerMultiJvmNode3 extends MultiDcSingletonManagerSpec +class MultiDcSingletonManagerMultiJvmNode4 extends MultiDcSingletonManagerSpec -class TeamSingleton extends Actor with ActorLogging { - import TeamSingleton._ +class MultiDcSingleton extends Actor with ActorLogging { + import MultiDcSingleton._ val cluster = Cluster(context.system) override def receive: Receive = { case Ping ⇒ - sender() ! Pong(cluster.settings.Team, cluster.selfAddress, cluster.selfRoles) + sender() ! Pong(cluster.settings.DataCenter, cluster.selfAddress, cluster.selfRoles) } } -object TeamSingleton { +object MultiDcSingleton { case object Ping - case class Pong(fromTeam: String, fromAddress: Address, roles: Set[String]) + case class Pong(fromDc: String, fromAddress: Address, roles: Set[String]) } -abstract class TeamSingletonManagerSpec extends MultiNodeSpec(TeamSingletonManagerSpec) with STMultiNodeSpec with ImplicitSender { - import TeamSingletonManagerSpec._ +abstract class MultiDcSingletonManagerSpec extends MultiNodeSpec(MultiDcSingletonManagerSpec) with STMultiNodeSpec with ImplicitSender { + import MultiDcSingletonManagerSpec._ override def initialParticipants = roles.size @@ -75,13 +75,13 @@ abstract class TeamSingletonManagerSpec extends MultiNodeSpec(TeamSingletonManag val worker = "worker" - "A SingletonManager in a team" must { - "start a singleton instance for each team" in { + "A SingletonManager in a multi data center cluster" must { + "start a singleton instance for each data center" in { runOn(first, second, third) { system.actorOf( ClusterSingletonManager.props( - Props[TeamSingleton](), + Props[MultiDcSingleton](), PoisonPill, ClusterSingletonManagerSettings(system).withRole(worker)), "singletonManager") @@ -93,33 +93,33 @@ abstract class TeamSingletonManagerSpec extends MultiNodeSpec(TeamSingletonManag enterBarrier("managers-started") - proxy ! TeamSingleton.Ping - val pong = expectMsgType[TeamSingleton.Pong](10.seconds) + proxy ! MultiDcSingleton.Ping + val pong = expectMsgType[MultiDcSingleton.Pong](10.seconds) enterBarrier("pongs-received") - pong.fromTeam should equal(Cluster(system).settings.Team) + pong.fromDc should equal(Cluster(system).settings.DataCenter) pong.roles should contain(worker) runOn(controller, first) { - pong.roles should contain(ClusterSettings.TeamRolePrefix + "one") + pong.roles should contain(ClusterSettings.DcRolePrefix + "one") } runOn(second, third) { - pong.roles should contain(ClusterSettings.TeamRolePrefix + "two") + pong.roles should contain(ClusterSettings.DcRolePrefix + "two") } enterBarrier("after-1") } - "be able to use proxy across different team" in { + "be able to use proxy across different data centers" in { runOn(third) { val proxy = system.actorOf(ClusterSingletonProxy.props( "/user/singletonManager", - ClusterSingletonProxySettings(system).withRole(worker).withTeam("one"))) - proxy ! TeamSingleton.Ping - val pong = expectMsgType[TeamSingleton.Pong](10.seconds) - pong.fromTeam should ===("one") + ClusterSingletonProxySettings(system).withRole(worker).withDataCenter("one"))) + proxy ! MultiDcSingleton.Ping + val pong = expectMsgType[MultiDcSingleton.Pong](10.seconds) + pong.fromDc should ===("one") pong.roles should contain(worker) - pong.roles should contain(ClusterSettings.TeamRolePrefix + "one") + pong.roles should contain(ClusterSettings.DcRolePrefix + "one") } enterBarrier("after-1") } diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index ad9e3414bd..2bb2bf6766 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -65,17 +65,18 @@ akka { # move 'WeaklyUp' members to 'Up' status once convergence has been reached. allow-weakly-up-members = on - # Teams are used to make islands of the cluster that are colocated. This can be used - # to make the cluster aware that it is running across multiple availability zones or regions. - # The team is added to the list of roles of the node with the prefix "team-". - team = "default" + # Defines which data center this node belongs to. It is typically used to make islands of the + # cluster that are colocated. This can be used to make the cluster aware that it is running + # across multiple availability zones or regions. It can also be used for other logical + # grouping of nodes. + data-center = "default" # The roles of this member. List of strings, e.g. roles = ["A", "B"]. # The roles are part of the membership information and can be used by # routers or other services to distribute work to certain member types, # e.g. front-end and back-end nodes. - # Roles are not allowed to start with "team-" as that is reserved for the - # special role assigned from the team a node belongs to (see above) + # Roles are not allowed to start with "dc-" as that is reserved for the + # special role assigned from the data-center a node belongs to (see above) roles = [] # Run the coordinated shutdown from phase 'cluster-shutdown' when the cluster diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 558f1420f8..3576cc6f0f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -421,31 +421,31 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { def logInfo(message: String): Unit = if (LogInfo) - if (settings.Team == ClusterSettings.DefaultTeam) + if (settings.DataCenter == ClusterSettings.DefaultDataCenter) log.info("Cluster Node [{}] - {}", selfAddress, message) else - log.info("Cluster Node [{}] team [{}] - {}", selfAddress, settings.Team, message) + log.info("Cluster Node [{}] dc [{}] - {}", selfAddress, settings.DataCenter, message) def logInfo(template: String, arg1: Any): Unit = if (LogInfo) - if (settings.Team == ClusterSettings.DefaultTeam) + if (settings.DataCenter == ClusterSettings.DefaultDataCenter) log.info("Cluster Node [{}] - " + template, selfAddress, arg1) else - log.info("Cluster Node [{}] team [{}] - " + template, selfAddress, settings.Team, arg1) + log.info("Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.DataCenter, arg1) def logInfo(template: String, arg1: Any, arg2: Any): Unit = if (LogInfo) - if (settings.Team == ClusterSettings.DefaultTeam) + if (settings.DataCenter == ClusterSettings.DefaultDataCenter) log.info("Cluster Node [{}] - " + template, selfAddress, arg1, arg2) else - log.info("Cluster Node [{}] team [{}] - " + template, selfAddress, settings.Team, arg1, arg2) + log.info("Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.DataCenter, arg1, arg2) def logInfo(template: String, arg1: Any, arg2: Any, arg3: Any): Unit = if (LogInfo) - if (settings.Team == ClusterSettings.DefaultTeam) + if (settings.DataCenter == ClusterSettings.DefaultDataCenter) log.info("Cluster Node [{}] - " + template, selfAddress, arg1, arg2, arg3) else - log.info("Cluster Node [{}] team [" + settings.Team + "] - " + template, selfAddress, arg1, arg2, arg3) + log.info("Cluster Node [{}] dc [" + settings.DataCenter + "] - " + template, selfAddress, arg1, arg2, arg3) } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index b0e1e38130..ae5815c978 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -330,7 +330,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } var exitingConfirmed = Set.empty[UniqueAddress] - def selfTeam = cluster.settings.Team + def selfDc = cluster.settings.DataCenter /** * Looks up and returns the remote cluster command connection for the specific address. @@ -681,10 +681,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with // send ExitingConfirmed to two potential leaders val membersExceptSelf = latestGossip.members.filter(_.uniqueAddress != selfUniqueAddress) - latestGossip.leaderOf(selfTeam, membersExceptSelf, selfUniqueAddress) match { + latestGossip.leaderOf(selfDc, membersExceptSelf, selfUniqueAddress) match { case Some(node1) ⇒ clusterCore(node1.address) ! ExitingConfirmed(selfUniqueAddress) - latestGossip.leaderOf(selfTeam, membersExceptSelf.filterNot(_.uniqueAddress == node1), selfUniqueAddress) match { + latestGossip.leaderOf(selfDc, membersExceptSelf.filterNot(_.uniqueAddress == node1), selfUniqueAddress) match { case Some(node2) ⇒ clusterCore(node2.address) ! ExitingConfirmed(selfUniqueAddress) case None ⇒ // no more potential leader @@ -723,7 +723,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val localMembers = localGossip.members val localOverview = localGossip.overview val localSeen = localOverview.seen - val localReachability = localGossip.teamReachability(selfTeam) + val localReachability = localGossip.dcReachability(selfDc) // check if the node to DOWN is in the `members` set localMembers.find(_.address == address) match { @@ -1004,11 +1004,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with * Runs periodic leader actions, such as member status transitions, assigning partitions etc. */ def leaderActions(): Unit = { - if (latestGossip.isTeamLeader(selfTeam, selfUniqueAddress, selfUniqueAddress)) { - // only run the leader actions if we are the LEADER of the team + if (latestGossip.isDcLeader(selfDc, selfUniqueAddress, selfUniqueAddress)) { + // only run the leader actions if we are the LEADER of the data center val firstNotice = 20 val periodicNotice = 60 - if (latestGossip.convergence(selfTeam, selfUniqueAddress, exitingConfirmed)) { + if (latestGossip.convergence(selfDc, selfUniqueAddress, exitingConfirmed)) { if (leaderActionCounter >= firstNotice) logInfo("Leader can perform its duties again") leaderActionCounter = 0 @@ -1021,9 +1021,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with if (leaderActionCounter == firstNotice || leaderActionCounter % periodicNotice == 0) logInfo( "Leader can currently not perform its duties, reachability status: [{}], member status: [{}]", - latestGossip.teamReachabilityExcludingDownedObservers(selfTeam), + latestGossip.dcReachabilityExcludingDownedObservers(selfDc), latestGossip.members.collect { - case m if m.team == selfTeam ⇒ + case m if m.dataCenter == selfDc ⇒ s"${m.address} ${m.status} seen=${latestGossip.seenByNode(m.uniqueAddress)}" }.mkString(", ")) } @@ -1036,8 +1036,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with if (latestGossip.member(selfUniqueAddress).status == Down) { // When all reachable have seen the state this member will shutdown itself when it has // status Down. The down commands should spread before we shutdown. - val unreachable = latestGossip.teamReachability(selfTeam).allUnreachableOrTerminated - val downed = latestGossip.teamMembers(selfTeam).collect { case m if m.status == Down ⇒ m.uniqueAddress } + val unreachable = latestGossip.dcReachability(selfDc).allUnreachableOrTerminated + val downed = latestGossip.dcMembers(selfDc).collect { case m if m.status == Down ⇒ m.uniqueAddress } if (downed.forall(node ⇒ unreachable(node) || latestGossip.seenByNode(node))) { // the reason for not shutting down immediately is to give the gossip a chance to spread // the downing information to other downed nodes, so that they can shutdown themselves @@ -1072,14 +1072,14 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with def leaderActionsOnConvergence(): Unit = { val removedUnreachable = for { - node ← latestGossip.teamReachability(selfTeam).allUnreachableOrTerminated + node ← latestGossip.dcReachability(selfDc).allUnreachableOrTerminated m = latestGossip.member(node) - if m.team == selfTeam && Gossip.removeUnreachableWithMemberStatus(m.status) + if m.dataCenter == selfDc && Gossip.removeUnreachableWithMemberStatus(m.status) } yield m val removedExitingConfirmed = exitingConfirmed.filter { n ⇒ val member = latestGossip.member(n) - member.team == selfTeam && member.status == Exiting + member.dataCenter == selfDc && member.status == Exiting } val changedMembers = { @@ -1090,7 +1090,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with var upNumber = 0 { - case m if m.team == selfTeam && isJoiningToUp(m) ⇒ + case m if m.dataCenter == selfDc && isJoiningToUp(m) ⇒ // Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence) // and minimum number of nodes have joined the cluster if (upNumber == 0) { @@ -1103,7 +1103,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } m.copyUp(upNumber) - case m if m.team == selfTeam && m.status == Leaving ⇒ + case m if m.dataCenter == selfDc && m.status == Leaving ⇒ // Move LEAVING => EXITING (once we have a convergence on LEAVING) m copy (status = Exiting) } @@ -1158,10 +1158,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val enoughMembers: Boolean = isMinNrOfMembersFulfilled def isJoiningToWeaklyUp(m: Member): Boolean = - m.team == selfTeam && + m.dataCenter == selfDc && m.status == Joining && enoughMembers && - latestGossip.teamReachabilityExcludingDownedObservers(selfTeam).isReachable(m.uniqueAddress) + latestGossip.dcReachabilityExcludingDownedObservers(selfDc).isReachable(m.uniqueAddress) val changedMembers = localMembers.collect { case m if isJoiningToWeaklyUp(m) ⇒ m.copy(status = WeaklyUp) } @@ -1269,7 +1269,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with clusterCore(node.address) ! GossipStatus(selfUniqueAddress, latestGossip.version) def validNodeForGossip(node: UniqueAddress): Boolean = - node != selfUniqueAddress && latestGossip.isReachableExcludingDownedObservers(selfTeam, node) + node != selfUniqueAddress && latestGossip.isReachableExcludingDownedObservers(selfDc, node) def updateLatestGossip(newGossip: Gossip): Unit = { // Updating the vclock version for the changes @@ -1295,7 +1295,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with def publish(newGossip: Gossip): Unit = { if (cluster.settings.Debug.VerboseGossipLogging) - log.debug("Cluster Node [{}] team [{}] - New gossip published [{}]", selfAddress, cluster.settings.Team, newGossip) + log.debug("Cluster Node [{}] dc [{}] - New gossip published [{}]", selfAddress, cluster.settings.DataCenter, newGossip) publisher ! PublishChanges(newGossip) if (PublishStatsInterval == Duration.Zero) publishInternalStats() diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 9b50818a6a..c2728d8eb1 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -7,7 +7,7 @@ import language.postfixOps import scala.collection.immutable import scala.collection.immutable.VectorBuilder import akka.actor.{ Actor, ActorLogging, ActorRef, Address } -import akka.cluster.ClusterSettings.Team +import akka.cluster.ClusterSettings.DataCenter import akka.cluster.ClusterEvent._ import akka.cluster.MemberStatus._ import akka.event.EventStream @@ -58,7 +58,7 @@ object ClusterEvent { /** * Current snapshot state of the cluster. Sent to new subscriber. * - * @param leader leader of the team of this node + * @param leader leader of the data center of this node */ final case class CurrentClusterState( members: immutable.SortedSet[Member] = immutable.SortedSet.empty, @@ -88,17 +88,17 @@ object ClusterEvent { scala.collection.JavaConverters.setAsJavaSetConverter(seenBy).asJava /** - * Java API: get address of current team leader, or null if none + * Java API: get address of current data center leader, or null if none */ def getLeader: Address = leader orNull /** - * get address of current leader, if any, within the team that has the given role + * get address of current leader, if any, within the data center that has the given role */ def roleLeader(role: String): Option[Address] = roleLeaderMap.getOrElse(role, None) /** - * Java API: get address of current leader, if any, within the team that has the given role + * Java API: get address of current leader, if any, within the data center that has the given role * or null if no such node exists */ def getRoleLeader(role: String): Address = roleLeaderMap.get(role).flatten.orNull @@ -115,15 +115,15 @@ object ClusterEvent { scala.collection.JavaConverters.setAsJavaSetConverter(allRoles).asJava /** - * All teams in the cluster + * All data centers in the cluster */ - def allTeams: Set[String] = members.map(_.team)(breakOut) + def allDataCenters: Set[String] = members.map(_.dataCenter)(breakOut) /** - * Java API: All teams in the cluster + * Java API: All data centers in the cluster */ - def getAllTeams: java.util.Set[String] = - scala.collection.JavaConverters.setAsJavaSetConverter(allTeams).asJava + def getAllDataCenters: java.util.Set[String] = + scala.collection.JavaConverters.setAsJavaSetConverter(allDataCenters).asJava } @@ -189,7 +189,7 @@ object ClusterEvent { } /** - * Leader of the cluster team of this node changed. Published when the state change + * Leader of the cluster data center of this node changed. Published when the state change * is first seen on a node. */ final case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent { @@ -201,7 +201,8 @@ object ClusterEvent { } /** - * First member (leader) of the members within a role set (in the same team as this node, if cluster teams are used) changed. + * First member (leader) of the members within a role set (in the same data center as this node, + * if data centers are used) changed. * Published when the state change is first seen on a node. */ final case class RoleLeaderChanged(role: String, leader: Option[Address]) extends ClusterDomainEvent { @@ -318,9 +319,9 @@ object ClusterEvent { * INTERNAL API */ @InternalApi - private[cluster] def diffLeader(team: Team, oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[LeaderChanged] = { - val newLeader = newGossip.teamLeader(team, selfUniqueAddress) - if (newLeader != oldGossip.teamLeader(team, selfUniqueAddress)) List(LeaderChanged(newLeader.map(_.address))) + private[cluster] def diffLeader(dc: DataCenter, oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[LeaderChanged] = { + val newLeader = newGossip.dcLeader(dc, selfUniqueAddress) + if (newLeader != oldGossip.dcLeader(dc, selfUniqueAddress)) List(LeaderChanged(newLeader.map(_.address))) else Nil } @@ -328,11 +329,11 @@ object ClusterEvent { * INTERNAL API */ @InternalApi - private[cluster] def diffRolesLeader(team: Team, oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): Set[RoleLeaderChanged] = { + private[cluster] def diffRolesLeader(dc: DataCenter, oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): Set[RoleLeaderChanged] = { for { role ← oldGossip.allRoles union newGossip.allRoles - newLeader = newGossip.roleLeader(team, role, selfUniqueAddress) - if newLeader != oldGossip.roleLeader(team, role, selfUniqueAddress) + newLeader = newGossip.roleLeader(dc, role, selfUniqueAddress) + if newLeader != oldGossip.roleLeader(dc, role, selfUniqueAddress) } yield RoleLeaderChanged(role, newLeader.map(_.address)) } @@ -340,12 +341,12 @@ object ClusterEvent { * INTERNAL API */ @InternalApi - private[cluster] def diffSeen(team: Team, oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[SeenChanged] = + private[cluster] def diffSeen(dc: DataCenter, oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[SeenChanged] = if (newGossip eq oldGossip) Nil else { - val newConvergence = newGossip.convergence(team, selfUniqueAddress, Set.empty) + val newConvergence = newGossip.convergence(dc, selfUniqueAddress, Set.empty) val newSeenBy = newGossip.seenBy - if (newConvergence != oldGossip.convergence(team, selfUniqueAddress, Set.empty) || newSeenBy != oldGossip.seenBy) + if (newConvergence != oldGossip.convergence(dc, selfUniqueAddress, Set.empty) || newSeenBy != oldGossip.seenBy) List(SeenChanged(newConvergence, newSeenBy.map(_.address))) else Nil } @@ -372,7 +373,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto val cluster = Cluster(context.system) val selfUniqueAddress = cluster.selfUniqueAddress var latestGossip: Gossip = Gossip.empty - def selfTeam = cluster.settings.Team + def selfDc = cluster.settings.DataCenter override def preRestart(reason: Throwable, message: Option[Any]) { // don't postStop when restarted, no children to stop @@ -407,11 +408,9 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto members = latestGossip.members, unreachable = unreachable, seenBy = latestGossip.seenBy.map(_.address), - leader = latestGossip.teamLeader(selfTeam, selfUniqueAddress).map(_.address), + leader = latestGossip.dcLeader(selfDc, selfUniqueAddress).map(_.address), roleLeaderMap = latestGossip.allRoles.map(r ⇒ - r → latestGossip.roleLeader(selfTeam, r, selfUniqueAddress).map(_.address) - )(collection.breakOut) - ) + r → latestGossip.roleLeader(selfDc, r, selfUniqueAddress).map(_.address))(collection.breakOut)) receiver ! state } @@ -446,10 +445,10 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto diffMemberEvents(oldGossip, newGossip) foreach pub diffUnreachable(oldGossip, newGossip, selfUniqueAddress) foreach pub diffReachable(oldGossip, newGossip, selfUniqueAddress) foreach pub - diffLeader(selfTeam, oldGossip, newGossip, selfUniqueAddress) foreach pub - diffRolesLeader(selfTeam, oldGossip, newGossip, selfUniqueAddress) foreach pub + diffLeader(selfDc, oldGossip, newGossip, selfUniqueAddress) foreach pub + diffRolesLeader(selfDc, oldGossip, newGossip, selfUniqueAddress) foreach pub // publish internal SeenState for testing purposes - diffSeen(selfTeam, oldGossip, newGossip, selfUniqueAddress) foreach pub + diffSeen(selfDc, oldGossip, newGossip, selfUniqueAddress) foreach pub diffReachability(oldGossip, newGossip) foreach pub } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index 10f7cb309a..20799cfb11 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -109,12 +109,12 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { def status: MemberStatus = self.status /** - * Is this node the current team leader + * Is this node the current data center leader */ def isLeader: Boolean = leader.contains(selfAddress) /** - * Get the address of the current team leader + * Get the address of the current data center leader */ def leader: Option[Address] = state.leader diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index ddc7b7717d..b7106526ca 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -18,18 +18,18 @@ import scala.concurrent.duration.FiniteDuration import akka.japi.Util.immutableSeq object ClusterSettings { - type Team = String + type DataCenter = String /** * INTERNAL API. */ @InternalApi - private[akka] val TeamRolePrefix = "team-" + private[akka] val DcRolePrefix = "dc-" /** * INTERNAL API. */ @InternalApi - private[akka] val DefaultTeam: Team = "default" + private[akka] val DefaultDataCenter: DataCenter = "default" } @@ -116,14 +116,13 @@ final class ClusterSettings(val config: Config, val systemName: String) { val AllowWeaklyUpMembers = cc.getBoolean("allow-weakly-up-members") - val Team: Team = cc.getString("team") + val DataCenter: DataCenter = cc.getString("data-center") val Roles: Set[String] = { val configuredRoles = (immutableSeq(cc.getStringList("roles")).toSet) requiring ( - _.forall(!_.startsWith(TeamRolePrefix)), - s"Roles must not start with '${TeamRolePrefix}' as that is reserved for the cluster team setting" - ) + _.forall(!_.startsWith(DcRolePrefix)), + s"Roles must not start with '${DcRolePrefix}' as that is reserved for the cluster data-center setting") - configuredRoles + s"$TeamRolePrefix$Team" + configuredRoles + s"$DcRolePrefix$DataCenter" } val MinNrOfMembers: Int = { cc.getInt("min-nr-of-members") diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 6bc18f38a2..a5e835113c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -5,7 +5,7 @@ package akka.cluster import scala.collection.{ SortedSet, immutable } -import ClusterSettings.Team +import ClusterSettings.DataCenter import MemberStatus._ import akka.annotation.InternalApi @@ -169,32 +169,31 @@ private[cluster] final case class Gossip( } /** - * Checks if we have a cluster convergence. If there are any in team node pairs that cannot reach each other + * Checks if we have a cluster convergence. If there are any in data center node pairs that cannot reach each other * then we can't have a convergence until those nodes reach each other again or one of them is downed * * @return true if convergence have been reached and false if not */ - def convergence(team: Team, selfUniqueAddress: UniqueAddress, exitingConfirmed: Set[UniqueAddress]): Boolean = { - // Find cluster members in the team that are unreachable from other members of the team - // excluding observations from members outside of the team, that have status DOWN or is passed in as confirmed exiting. - val unreachableInTeam = teamReachabilityExcludingDownedObservers(team).allUnreachableOrTerminated.collect { + def convergence(dc: DataCenter, selfUniqueAddress: UniqueAddress, exitingConfirmed: Set[UniqueAddress]): Boolean = { + // Find cluster members in the data center that are unreachable from other members of the data center + // excluding observations from members outside of the data center, that have status DOWN or is passed in as confirmed exiting. + val unreachableInDc = dcReachabilityExcludingDownedObservers(dc).allUnreachableOrTerminated.collect { case node if node != selfUniqueAddress && !exitingConfirmed(node) ⇒ member(node) } - // If another member in the team that is UP or LEAVING and has not seen this gossip or is exiting + // If another member in the data center that is UP or LEAVING and has not seen this gossip or is exiting // convergence cannot be reached - def teamMemberHinderingConvergenceExists = + def memberHinderingConvergenceExists = members.exists(member ⇒ - member.team == team && + member.dataCenter == dc && Gossip.convergenceMemberStatus(member.status) && - !(seenByNode(member.uniqueAddress) || exitingConfirmed(member.uniqueAddress)) - ) + !(seenByNode(member.uniqueAddress) || exitingConfirmed(member.uniqueAddress))) - // unreachables outside of the team or with status DOWN or EXITING does not affect convergence + // unreachables outside of the data center or with status DOWN or EXITING does not affect convergence def allUnreachablesCanBeIgnored = - unreachableInTeam.forall(unreachable ⇒ Gossip.convergenceSkipUnreachableWithMemberStatus(unreachable.status)) + unreachableInDc.forall(unreachable ⇒ Gossip.convergenceSkipUnreachableWithMemberStatus(unreachable.status)) - allUnreachablesCanBeIgnored && !teamMemberHinderingConvergenceExists + allUnreachablesCanBeIgnored && !memberHinderingConvergenceExists } lazy val reachabilityExcludingDownedObservers: Reachability = { @@ -203,77 +202,77 @@ private[cluster] final case class Gossip( } /** - * @return Reachability excluding observations from nodes outside of the team, but including observed unreachable - * nodes outside of the team + * @return Reachability excluding observations from nodes outside of the data center, but including observed unreachable + * nodes outside of the data center */ - def teamReachability(team: Team): Reachability = - overview.reachability.removeObservers(members.collect { case m if m.team != team ⇒ m.uniqueAddress }) + def dcReachability(dc: DataCenter): Reachability = + overview.reachability.removeObservers(members.collect { case m if m.dataCenter != dc ⇒ m.uniqueAddress }) /** - * @return reachability for team nodes, with observations from outside the team or from downed nodes filtered out + * @return reachability for data center nodes, with observations from outside the data center or from downed nodes filtered out */ - def teamReachabilityExcludingDownedObservers(team: Team): Reachability = { - val membersToExclude = members.collect { case m if m.status == Down || m.team != team ⇒ m.uniqueAddress } - overview.reachability.removeObservers(membersToExclude).remove(members.collect { case m if m.team != team ⇒ m.uniqueAddress }) + def dcReachabilityExcludingDownedObservers(dc: DataCenter): Reachability = { + val membersToExclude = members.collect { case m if m.status == Down || m.dataCenter != dc ⇒ m.uniqueAddress } + overview.reachability.removeObservers(membersToExclude).remove(members.collect { case m if m.dataCenter != dc ⇒ m.uniqueAddress }) } - def teamMembers(team: Team): SortedSet[Member] = - members.filter(_.team == team) + def dcMembers(dc: DataCenter): SortedSet[Member] = + members.filter(_.dataCenter == dc) - def isTeamLeader(team: Team, node: UniqueAddress, selfUniqueAddress: UniqueAddress): Boolean = - teamLeader(team, selfUniqueAddress).contains(node) + def isDcLeader(dc: DataCenter, node: UniqueAddress, selfUniqueAddress: UniqueAddress): Boolean = + dcLeader(dc, selfUniqueAddress).contains(node) - def teamLeader(team: Team, selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = - leaderOf(team, members, selfUniqueAddress) + def dcLeader(dc: DataCenter, selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = + leaderOf(dc, members, selfUniqueAddress) - def roleLeader(team: Team, role: String, selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = - leaderOf(team, members.filter(_.hasRole(role)), selfUniqueAddress) + def roleLeader(dc: DataCenter, role: String, selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = + leaderOf(dc, members.filter(_.hasRole(role)), selfUniqueAddress) - def leaderOf(team: Team, mbrs: immutable.SortedSet[Member], selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = { - val reachability = teamReachability(team) + def leaderOf(dc: DataCenter, mbrs: immutable.SortedSet[Member], selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = { + val reachability = dcReachability(dc) - val reachableTeamMembers = - if (reachability.isAllReachable) mbrs.filter(m ⇒ m.team == team && m.status != Down) + val reachableMembersInDc = + if (reachability.isAllReachable) mbrs.filter(m ⇒ m.dataCenter == dc && m.status != Down) else mbrs.filter(m ⇒ - m.team == team && + m.dataCenter == dc && m.status != Down && (reachability.isReachable(m.uniqueAddress) || m.uniqueAddress == selfUniqueAddress)) - if (reachableTeamMembers.isEmpty) None - else reachableTeamMembers.find(m ⇒ Gossip.leaderMemberStatus(m.status)) - .orElse(Some(reachableTeamMembers.min(Member.leaderStatusOrdering))) + if (reachableMembersInDc.isEmpty) None + else reachableMembersInDc.find(m ⇒ Gossip.leaderMemberStatus(m.status)) + .orElse(Some(reachableMembersInDc.min(Member.leaderStatusOrdering))) .map(_.uniqueAddress) } - def allTeams: Set[Team] = members.map(_.team) + def allDataCenters: Set[DataCenter] = members.map(_.dataCenter) def allRoles: Set[String] = members.flatMap(_.roles) def isSingletonCluster: Boolean = members.size == 1 /** - * @return true if toAddress should be reachable from the fromTeam in general, within a team - * this means only caring about team-local observations, across teams it means caring - * about all observations for the toAddress. + * @return true if toAddress should be reachable from the fromDc in general, within a data center + * this means only caring about data center local observations, across data centers it + * means caring about all observations for the toAddress. */ - def isReachableExcludingDownedObservers(fromTeam: Team, toAddress: UniqueAddress): Boolean = + def isReachableExcludingDownedObservers(fromDc: DataCenter, toAddress: UniqueAddress): Boolean = if (!hasMember(toAddress)) false else { val to = member(toAddress) - // if member is in the same team, we ignore cross-team unreachability - if (fromTeam == to.team) teamReachabilityExcludingDownedObservers(fromTeam).isReachable(toAddress) + // if member is in the same data center, we ignore cross data center unreachability + if (fromDc == to.dataCenter) dcReachabilityExcludingDownedObservers(fromDc).isReachable(toAddress) // if not it is enough that any non-downed node observed it as unreachable else reachabilityExcludingDownedObservers.isReachable(toAddress) } /** * @return true if fromAddress should be able to reach toAddress based on the unreachability data and their - * respective teams + * respective data centers */ def isReachable(fromAddress: UniqueAddress, toAddress: UniqueAddress): Boolean = if (!hasMember(toAddress)) false else { - // as it looks for specific unreachable entires for the node pair we don't have to filter on team + // as it looks for specific unreachable entires for the node pair we don't have to filter on data center overview.reachability.isReachable(fromAddress, toAddress) } diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index 79b6ac7b77..4fb0fbc73e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -7,7 +7,7 @@ package akka.cluster import akka.actor.Address import MemberStatus._ import akka.annotation.InternalApi -import akka.cluster.ClusterSettings.Team +import akka.cluster.ClusterSettings.DataCenter import scala.runtime.AbstractFunction2 @@ -24,9 +24,9 @@ class Member private[cluster] ( val status: MemberStatus, val roles: Set[String]) extends Serializable { - lazy val team: String = roles.find(_.startsWith(ClusterSettings.TeamRolePrefix)) - .getOrElse(throw new IllegalStateException("Team undefined, should not be possible")) - .substring(ClusterSettings.TeamRolePrefix.length) + lazy val dataCenter: DataCenter = roles.find(_.startsWith(ClusterSettings.DcRolePrefix)) + .getOrElse(throw new IllegalStateException("DataCenter undefined, should not be possible")) + .substring(ClusterSettings.DcRolePrefix.length) def address: Address = uniqueAddress.address @@ -36,10 +36,10 @@ class Member private[cluster] ( case _ ⇒ false } override def toString = - if (team == ClusterSettings.DefaultTeam) + if (dataCenter == ClusterSettings.DefaultDataCenter) s"Member(address = $address, status = $status)" else - s"Member(address = $address, team = $team, status = $status)" + s"Member(address = $address, dataCenter = $dataCenter, status = $status)" def hasRole(role: String): Boolean = roles.contains(role) @@ -54,8 +54,8 @@ class Member private[cluster] ( * member. It is only correct when comparing two existing members in a * cluster. A member that joined after removal of another member may be * considered older than the removed member. Note that is only makes - * sense to compare with other members inside of one team (upNumber has - * a higher risk of being reused across teams). + * sense to compare with other members inside of one data center (upNumber has + * a higher risk of being reused across data centers). */ def isOlderThan(other: Member): Boolean = if (upNumber == other.upNumber) @@ -97,7 +97,7 @@ object Member { * INTERNAL API */ private[cluster] def removed(node: UniqueAddress): Member = - new Member(node, Int.MaxValue, Removed, Set(ClusterSettings.TeamRolePrefix + "-N/A")) + new Member(node, Int.MaxValue, Removed, Set(ClusterSettings.DcRolePrefix + "-N/A")) /** * `Address` ordering type class, sorts addresses by host and port. diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index 25a9e69eef..7af7cc9ced 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -356,11 +356,11 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri roleIndex ← roleIndexes role = roleMapping(roleIndex) } { - if (role.startsWith(ClusterSettings.TeamRolePrefix)) containsDc = true + if (role.startsWith(ClusterSettings.DcRolePrefix)) containsDc = true roles += role } - if (!containsDc) roles + (ClusterSettings.TeamRolePrefix + "default") + if (!containsDc) roles + (ClusterSettings.DcRolePrefix + "default") else roles } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala index cb43fe4d52..f33f9823b5 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala @@ -120,7 +120,7 @@ abstract class MBeanSpec | { | "address": "${sortedNodes(0)}", | "roles": [ - | "team-default", + | "dc-default", | "testNode" | ], | "status": "Up" @@ -128,7 +128,7 @@ abstract class MBeanSpec | { | "address": "${sortedNodes(1)}", | "roles": [ - | "team-default", + | "dc-default", | "testNode" | ], | "status": "Up" @@ -136,7 +136,7 @@ abstract class MBeanSpec | { | "address": "${sortedNodes(2)}", | "roles": [ - | "team-default", + | "dc-default", | "testNode" | ], | "status": "Up" @@ -144,7 +144,7 @@ abstract class MBeanSpec | { | "address": "${sortedNodes(3)}", | "roles": [ - | "team-default", + | "dc-default", | "testNode" | ], | "status": "Up" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiTeamClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcClusterSpec.scala similarity index 70% rename from akka-cluster/src/multi-jvm/scala/akka/cluster/MultiTeamClusterSpec.scala rename to akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcClusterSpec.scala index a1355f6ea9..9f49188dd4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiTeamClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcClusterSpec.scala @@ -10,7 +10,7 @@ import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ -object MultiTeamMultiJvmSpec extends MultiNodeConfig { +object MultiDcMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") @@ -21,32 +21,32 @@ object MultiTeamMultiJvmSpec extends MultiNodeConfig { nodeConfig(first, second)(ConfigFactory.parseString( """ - akka.cluster.team = "dc1" + akka.cluster.data-center = "dc1" akka.loglevel = INFO """)) nodeConfig(third, fourth, fifth)(ConfigFactory.parseString( """ - akka.cluster.team = "dc2" + akka.cluster.data-center = "dc2" akka.loglevel = INFO """)) testTransport(on = true) } -class MultiTeamMultiJvmNode1 extends MultiTeamSpec -class MultiTeamMultiJvmNode2 extends MultiTeamSpec -class MultiTeamMultiJvmNode3 extends MultiTeamSpec -class MultiTeamMultiJvmNode4 extends MultiTeamSpec -class MultiTeamMultiJvmNode5 extends MultiTeamSpec +class MultiDcMultiJvmNode1 extends MultiDcSpec +class MultiDcMultiJvmNode2 extends MultiDcSpec +class MultiDcMultiJvmNode3 extends MultiDcSpec +class MultiDcMultiJvmNode4 extends MultiDcSpec +class MultiDcMultiJvmNode5 extends MultiDcSpec -abstract class MultiTeamSpec - extends MultiNodeSpec(MultiTeamMultiJvmSpec) +abstract class MultiDcSpec + extends MultiNodeSpec(MultiDcMultiJvmSpec) with MultiNodeClusterSpec { - import MultiTeamMultiJvmSpec._ + import MultiDcMultiJvmSpec._ - "A cluster with multiple cluster teams" must { + "A cluster with multiple data centers" must { "be able to form" in { runOn(first) { @@ -66,31 +66,31 @@ abstract class MultiTeamSpec enterBarrier("cluster started") } - "have a leader per team" in { + "have a leader per data center" in { runOn(first, second) { - cluster.settings.Team should ===("dc1") + cluster.settings.DataCenter should ===("dc1") clusterView.leader shouldBe defined val dc1 = Set(address(first), address(second)) dc1 should contain(clusterView.leader.get) } runOn(third, fourth) { - cluster.settings.Team should ===("dc2") + cluster.settings.DataCenter should ===("dc2") clusterView.leader shouldBe defined val dc2 = Set(address(third), address(fourth)) dc2 should contain(clusterView.leader.get) } - enterBarrier("leader per team") + enterBarrier("leader per data center") } - "be able to have team member changes while there is inter-team unreachability" in within(20.seconds) { + "be able to have data center member changes while there is inter data center unreachability" in within(20.seconds) { runOn(first) { testConductor.blackhole(first, third, Direction.Both).await } runOn(first, second, third, fourth) { awaitAssert(clusterView.unreachableMembers should not be empty) } - enterBarrier("inter-team unreachability") + enterBarrier("inter-data-center unreachability") runOn(fifth) { cluster.join(third) @@ -108,17 +108,17 @@ abstract class MultiTeamSpec runOn(first, second, third, fourth) { awaitAssert(clusterView.unreachableMembers should not be empty) } - enterBarrier("inter-team unreachability end") + enterBarrier("inter-data-center unreachability end") } - "be able to have team member changes while there is unreachability in another team" in within(20.seconds) { + "be able to have data center member changes while there is unreachability in another data center" in within(20.seconds) { runOn(first) { testConductor.blackhole(first, second, Direction.Both).await } runOn(first, second, third, fourth) { awaitAssert(clusterView.unreachableMembers should not be empty) } - enterBarrier("other-team-internal-unreachable") + enterBarrier("other-data-center-internal-unreachable") runOn(third) { cluster.join(fifth) @@ -130,15 +130,15 @@ abstract class MultiTeamSpec awaitAssert(clusterView.members.collect { case m if m.status == Up ⇒ m.address } should contain(address(fifth))) } - enterBarrier("other-team-internal-unreachable changed") + enterBarrier("other-data-center-internal-unreachable changed") runOn(first) { testConductor.passThrough(first, second, Direction.Both).await } - enterBarrier("other-team-internal-unreachable end") + enterBarrier("other-datac-enter-internal-unreachable end") } - "be able to down a member of another team" in within(20.seconds) { + "be able to down a member of another data-center" in within(20.seconds) { runOn(fifth) { cluster.down(address(second)) } @@ -146,7 +146,7 @@ abstract class MultiTeamSpec runOn(first, third, fifth) { awaitAssert(clusterView.members.map(_.address) should not contain (address(second))) } - enterBarrier("cross-team-downed") + enterBarrier("cross-data-center-downed") } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiTeamSplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSplitBrainSpec.scala similarity index 59% rename from akka-cluster/src/multi-jvm/scala/akka/cluster/MultiTeamSplitBrainSpec.scala rename to akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSplitBrainSpec.scala index 772f2de585..86b75dbe09 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiTeamSplitBrainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSplitBrainSpec.scala @@ -9,7 +9,7 @@ import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ -object MultiTeamSplitBrainMultiJvmSpec extends MultiNodeConfig { +object MultiDcSplitBrainMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") @@ -19,35 +19,35 @@ object MultiTeamSplitBrainMultiJvmSpec extends MultiNodeConfig { nodeConfig(first, second)(ConfigFactory.parseString( """ - akka.cluster.team = "dc1" + akka.cluster.data-center = "dc1" akka.loglevel = INFO """)) nodeConfig(third, fourth)(ConfigFactory.parseString( """ - akka.cluster.team = "dc2" + akka.cluster.data-center = "dc2" akka.loglevel = INFO """)) testTransport(on = true) } -class MultiTeamSplitBrainMultiJvmNode1 extends MultiTeamSpec -class MultiTeamSplitBrainMultiJvmNode2 extends MultiTeamSpec -class MultiTeamSplitBrainMultiJvmNode3 extends MultiTeamSpec -class MultiTeamSplitBrainMultiJvmNode4 extends MultiTeamSpec -class MultiTeamSplitBrainMultiJvmNode5 extends MultiTeamSpec +class MultiDcSplitBrainMultiJvmNode1 extends MultiDcSpec +class MultiDcSplitBrainMultiJvmNode2 extends MultiDcSpec +class MultiDcSplitBrainMultiJvmNode3 extends MultiDcSpec +class MultiDcSplitBrainMultiJvmNode4 extends MultiDcSpec +class MultiDcSplitBrainMultiJvmNode5 extends MultiDcSpec -abstract class MultiTeamSplitBrainSpec - extends MultiNodeSpec(MultiTeamSplitBrainMultiJvmSpec) +abstract class MultiDcSplitBrainSpec + extends MultiNodeSpec(MultiDcSplitBrainMultiJvmSpec) with MultiNodeClusterSpec { - import MultiTeamSplitBrainMultiJvmSpec._ + import MultiDcSplitBrainMultiJvmSpec._ val dc1 = List(first, second) val dc2 = List(third, fourth) - def splitTeams(): Unit = { + def splitDataCenters(): Unit = { runOn(first) { for { dc1Node ← dc1 @@ -66,7 +66,7 @@ abstract class MultiTeamSplitBrainSpec } - def unsplitTeams(): Unit = { + def unsplitDataCenters(): Unit = { runOn(first) { for { dc1Node ← dc1 @@ -79,45 +79,45 @@ abstract class MultiTeamSplitBrainSpec awaitAllReachable() } - "A cluster with multiple cluster teams" must { - "be able to form two teams" in { + "A cluster with multiple data centers" must { + "be able to form two data centers" in { awaitClusterUp(first, second, third) } - "be able to have a team member join while there is inter-team split" in within(20.seconds) { - // introduce a split between teams - splitTeams() - enterBarrier("team-split-1") + "be able to have a data center member join while there is inter data center split" in within(20.seconds) { + // introduce a split between data centers + splitDataCenters() + enterBarrier("data-center-split-1") runOn(fourth) { cluster.join(third) } - enterBarrier("inter-team unreachability") + enterBarrier("inter-data-center unreachability") // should be able to join and become up since the // split is between dc1 and dc2 runOn(third, fourth) { awaitAssert(clusterView.members.collect { - case m if m.team == "dc2" && m.status == MemberStatus.Up ⇒ m.address + case m if m.dataCenter == "dc2" && m.status == MemberStatus.Up ⇒ m.address }) should ===(Set(address(third), address(fourth))) } enterBarrier("dc2-join-completed") - unsplitTeams() - enterBarrier("team-unsplit-1") + unsplitDataCenters() + enterBarrier("data-center-unsplit-1") runOn(dc1: _*) { awaitAssert(clusterView.members.collect { - case m if m.team == "dc2" && m.status == MemberStatus.Up ⇒ m.address + case m if m.dataCenter == "dc2" && m.status == MemberStatus.Up ⇒ m.address }) should ===(Set(address(third), address(fourth))) } - enterBarrier("inter-team-split-1-done") + enterBarrier("inter-data-center-split-1-done") } - "be able to have team member leave while there is inter-team split" in within(20.seconds) { - splitTeams() - enterBarrier("team-split-2") + "be able to have data center member leave while there is inter data center split" in within(20.seconds) { + splitDataCenters() + enterBarrier("data-center-split-2") runOn(fourth) { cluster.leave(third) @@ -128,13 +128,13 @@ abstract class MultiTeamSplitBrainSpec } enterBarrier("node-4-left") - unsplitTeams() - enterBarrier("team-unsplit-2") + unsplitDataCenters() + enterBarrier("data-center-unsplit-2") runOn(first, second) { awaitAssert(clusterView.members.filter(_.address == address(fourth)) should ===(Set.empty)) } - enterBarrier("inter-team-split-2-done") + enterBarrier("inter-data-center-split-2-done") } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala index 9da57760ca..24322d6b21 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala @@ -91,7 +91,7 @@ abstract class QuickRestartSpec Cluster(system).state.members.size should ===(totalNumberOfNodes) Cluster(system).state.members.map(_.status == MemberStatus.Up) // use the role to test that it is the new incarnation that joined, sneaky - Cluster(system).state.members.flatMap(_.roles) should ===(Set(s"round-$n", ClusterSettings.TeamRolePrefix + "default")) + Cluster(system).state.members.flatMap(_.roles) should ===(Set(s"round-$n", ClusterSettings.DcRolePrefix + "default")) } } enterBarrier("members-up-" + n) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 3792ec722f..5bcec3aa07 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -45,8 +45,8 @@ class ClusterConfigSpec extends AkkaSpec { DownRemovalMargin should ===(Duration.Zero) MinNrOfMembers should ===(1) MinNrOfMembersOfRole should ===(Map.empty[String, Int]) - Team should ===("default") - Roles should ===(Set(ClusterSettings.TeamRolePrefix + "default")) + DataCenter should ===("default") + Roles should ===(Set(ClusterSettings.DcRolePrefix + "default")) JmxEnabled should ===(true) UseDispatcher should ===(Dispatchers.DefaultDispatcherId) GossipDifferentViewProbability should ===(0.8 +- 0.0001) @@ -61,13 +61,13 @@ class ClusterConfigSpec extends AkkaSpec { |akka { | cluster { | roles = [ "hamlet" ] - | team = "blue" + | data-center = "blue" | } |} """.stripMargin).withFallback(ConfigFactory.load()), system.name) import settings._ - Roles should ===(Set("hamlet", ClusterSettings.TeamRolePrefix + "blue")) - Team should ===("blue") + Roles should ===(Set("hamlet", ClusterSettings.DcRolePrefix + "blue")) + DataCenter should ===("blue") } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index 1d78f51a85..538546c50d 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -133,7 +133,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish publisher ! PublishChanges(Gossip(members = SortedSet(cJoining, dUp))) subscriber.expectMsgAllOf( RoleLeaderChanged("GRP", Some(dUp.address)), - RoleLeaderChanged(ClusterSettings.TeamRolePrefix + ClusterSettings.DefaultTeam, Some(dUp.address)) + RoleLeaderChanged(ClusterSettings.DcRolePrefix + ClusterSettings.DefaultDataCenter, Some(dUp.address)) ) publisher ! PublishChanges(Gossip(members = SortedSet(cUp, dUp))) subscriber.expectMsg(RoleLeaderChanged("GRP", Some(cUp.address))) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala index f3c5f54ab6..785b813d44 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -52,7 +52,7 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { diffMemberEvents(g1, g2) should ===(Seq(MemberUp(bUp), MemberJoined(eJoining))) diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty) - diffSeen(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) + diffSeen(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) } "be produced for changed status of members" in { @@ -61,7 +61,7 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { diffMemberEvents(g1, g2) should ===(Seq(MemberUp(aUp), MemberLeft(cLeaving), MemberJoined(eJoining))) diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty) - diffSeen(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) + diffSeen(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) } "be produced for members in unreachable" in { @@ -76,7 +76,7 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq(UnreachableMember(bDown))) // never include self member in unreachable diffUnreachable(g1, g2, bDown.uniqueAddress) should ===(Seq()) - diffSeen(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(Seq.empty) + diffSeen(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(Seq.empty) } "be produced for members becoming reachable after unreachable" in { @@ -104,7 +104,7 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { diffMemberEvents(g1, g2) should ===(Seq(MemberRemoved(dRemoved, Exiting))) diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty) - diffSeen(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) + diffSeen(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) } "be produced for convergence changes" in { @@ -113,10 +113,10 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { diffMemberEvents(g1, g2) should ===(Seq.empty) diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty) - diffSeen(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address)))) + diffSeen(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address)))) diffMemberEvents(g2, g1) should ===(Seq.empty) diffUnreachable(g2, g1, selfDummyAddress) should ===(Seq.empty) - diffSeen(ClusterSettings.DefaultTeam, g2, g1, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address, eJoining.address)))) + diffSeen(ClusterSettings.DefaultDataCenter, g2, g1, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address, eJoining.address)))) } "be produced for leader changes" in { @@ -125,33 +125,33 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { diffMemberEvents(g1, g2) should ===(Seq(MemberRemoved(aRemoved, Up))) diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty) - diffSeen(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) - diffLeader(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(Seq(LeaderChanged(Some(bUp.address)))) + diffSeen(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) + diffLeader(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(Seq(LeaderChanged(Some(bUp.address)))) } - "be produced for role leader changes in the same team" in { + "be produced for role leader changes in the same data center" in { val g0 = Gossip.empty val g1 = Gossip(members = SortedSet(aUp, bUp, cUp, dLeaving, eJoining)) val g2 = Gossip(members = SortedSet(bUp, cUp, dExiting, eJoining)) - diffRolesLeader(ClusterSettings.DefaultTeam, g0, g1, selfDummyAddress) should ===( + diffRolesLeader(ClusterSettings.DefaultDataCenter, g0, g1, selfDummyAddress) should ===( Set( // since this role is implicitly added - RoleLeaderChanged(ClusterSettings.TeamRolePrefix + ClusterSettings.DefaultTeam, Some(aUp.address)), + RoleLeaderChanged(ClusterSettings.DcRolePrefix + ClusterSettings.DefaultDataCenter, Some(aUp.address)), RoleLeaderChanged("AA", Some(aUp.address)), RoleLeaderChanged("AB", Some(aUp.address)), RoleLeaderChanged("BB", Some(bUp.address)), RoleLeaderChanged("DD", Some(dLeaving.address)), RoleLeaderChanged("DE", Some(dLeaving.address)), RoleLeaderChanged("EE", Some(eUp.address)))) - diffRolesLeader(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===( + diffRolesLeader(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===( Set( - RoleLeaderChanged(ClusterSettings.TeamRolePrefix + ClusterSettings.DefaultTeam, Some(bUp.address)), + RoleLeaderChanged(ClusterSettings.DcRolePrefix + ClusterSettings.DefaultDataCenter, Some(bUp.address)), RoleLeaderChanged("AA", None), RoleLeaderChanged("AB", Some(bUp.address)), RoleLeaderChanged("DE", Some(eJoining.address)))) } - "not be produced for role leader changes in other teams" in { + "not be produced for role leader changes in other data centers" in { val g0 = Gossip.empty val g1 = Gossip(members = SortedSet(aUp, bUp, cUp, dLeaving, eJoining)) val g2 = Gossip(members = SortedSet(bUp, cUp, dExiting, eJoining)) diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index 810555adfc..ab3f9a484e 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -7,7 +7,7 @@ package akka.cluster import org.scalatest.WordSpec import org.scalatest.Matchers import akka.actor.Address -import akka.cluster.ClusterSettings.DefaultTeam +import akka.cluster.ClusterSettings.DefaultDataCenter import scala.collection.immutable.SortedSet @@ -27,55 +27,54 @@ class GossipSpec extends WordSpec with Matchers { val e2 = TestMember(e1.address, Up) val e3 = TestMember(e1.address, Down) - val dc1a1 = TestMember(Address("akka.tcp", "sys", "a", 2552), Up, Set.empty, team = "dc1") - val dc1b1 = TestMember(Address("akka.tcp", "sys", "b", 2552), Up, Set.empty, team = "dc1") - val dc2c1 = TestMember(Address("akka.tcp", "sys", "c", 2552), Up, Set.empty, team = "dc2") - val dc2d1 = TestMember(Address("akka.tcp", "sys", "d", 2552), Up, Set.empty, team = "dc2") - val dc2d2 = TestMember(dc2d1.address, status = Down, roles = Set.empty, team = dc2d1.team) + val dc1a1 = TestMember(Address("akka.tcp", "sys", "a", 2552), Up, Set.empty, dataCenter = "dc1") + val dc1b1 = TestMember(Address("akka.tcp", "sys", "b", 2552), Up, Set.empty, dataCenter = "dc1") + val dc2c1 = TestMember(Address("akka.tcp", "sys", "c", 2552), Up, Set.empty, dataCenter = "dc2") + val dc2d1 = TestMember(Address("akka.tcp", "sys", "d", 2552), Up, Set.empty, dataCenter = "dc2") + val dc2d2 = TestMember(dc2d1.address, status = Down, roles = Set.empty, dataCenter = dc2d1.dataCenter) "A Gossip" must { "have correct test setup" in { List(a1, a2, b1, b2, c1, c2, c3, d1, e1, e2, e3).foreach(m ⇒ - m.team should ===(DefaultTeam) - ) + m.dataCenter should ===(DefaultDataCenter)) } "reach convergence when it's empty" in { - Gossip.empty.convergence(DefaultTeam, a1.uniqueAddress, Set.empty) should ===(true) + Gossip.empty.convergence(DefaultDataCenter, a1.uniqueAddress, Set.empty) should ===(true) } "reach convergence for one node" in { val g1 = Gossip(members = SortedSet(a1)).seen(a1.uniqueAddress) - g1.convergence(DefaultTeam, a1.uniqueAddress, Set.empty) should ===(true) + g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set.empty) should ===(true) } "not reach convergence until all have seen version" in { val g1 = Gossip(members = SortedSet(a1, b1)).seen(a1.uniqueAddress) - g1.convergence(DefaultTeam, a1.uniqueAddress, Set.empty) should ===(false) + g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set.empty) should ===(false) } "reach convergence for two nodes" in { val g1 = Gossip(members = SortedSet(a1, b1)).seen(a1.uniqueAddress).seen(b1.uniqueAddress) - g1.convergence(DefaultTeam, a1.uniqueAddress, Set.empty) should ===(true) + g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set.empty) should ===(true) } "reach convergence, skipping joining" in { // e1 is joining val g1 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.uniqueAddress).seen(b1.uniqueAddress) - g1.convergence(DefaultTeam, a1.uniqueAddress, Set.empty) should ===(true) + g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set.empty) should ===(true) } "reach convergence, skipping down" in { // e3 is down val g1 = Gossip(members = SortedSet(a1, b1, e3)).seen(a1.uniqueAddress).seen(b1.uniqueAddress) - g1.convergence(DefaultTeam, a1.uniqueAddress, Set.empty) should ===(true) + g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set.empty) should ===(true) } "reach convergence, skipping Leaving with exitingConfirmed" in { // c1 is Leaving val g1 = Gossip(members = SortedSet(a1, b1, c1)).seen(a1.uniqueAddress).seen(b1.uniqueAddress) - g1.convergence(DefaultTeam, a1.uniqueAddress, Set(c1.uniqueAddress)) should ===(true) + g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set(c1.uniqueAddress)) should ===(true) } "reach convergence, skipping unreachable Leaving with exitingConfirmed" in { @@ -83,16 +82,16 @@ class GossipSpec extends WordSpec with Matchers { val r1 = Reachability.empty.unreachable(b1.uniqueAddress, c1.uniqueAddress) val g1 = Gossip(members = SortedSet(a1, b1, c1), overview = GossipOverview(reachability = r1)) .seen(a1.uniqueAddress).seen(b1.uniqueAddress) - g1.convergence(DefaultTeam, a1.uniqueAddress, Set(c1.uniqueAddress)) should ===(true) + g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set(c1.uniqueAddress)) should ===(true) } "not reach convergence when unreachable" in { val r1 = Reachability.empty.unreachable(b1.uniqueAddress, a1.uniqueAddress) val g1 = (Gossip(members = SortedSet(a1, b1), overview = GossipOverview(reachability = r1))) .seen(a1.uniqueAddress).seen(b1.uniqueAddress) - g1.convergence(DefaultTeam, b1.uniqueAddress, Set.empty) should ===(false) + g1.convergence(DefaultDataCenter, b1.uniqueAddress, Set.empty) should ===(false) // but from a1's point of view (it knows that itself is not unreachable) - g1.convergence(DefaultTeam, a1.uniqueAddress, Set.empty) should ===(true) + g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set.empty) should ===(true) } "reach convergence when downed node has observed unreachable" in { @@ -100,7 +99,7 @@ class GossipSpec extends WordSpec with Matchers { val r1 = Reachability.empty.unreachable(e3.uniqueAddress, a1.uniqueAddress) val g1 = (Gossip(members = SortedSet(a1, b1, e3), overview = GossipOverview(reachability = r1))) .seen(a1.uniqueAddress).seen(b1.uniqueAddress).seen(e3.uniqueAddress) - g1.convergence(DefaultTeam, b1.uniqueAddress, Set.empty) should ===(true) + g1.convergence(DefaultDataCenter, b1.uniqueAddress, Set.empty) should ===(true) } "merge members by status priority" in { @@ -147,37 +146,37 @@ class GossipSpec extends WordSpec with Matchers { } "have leader as first member based on ordering, except Exiting status" in { - Gossip(members = SortedSet(c2, e2)).teamLeader(DefaultTeam, c2.uniqueAddress) should ===(Some(c2.uniqueAddress)) - Gossip(members = SortedSet(c3, e2)).teamLeader(DefaultTeam, c3.uniqueAddress) should ===(Some(e2.uniqueAddress)) - Gossip(members = SortedSet(c3)).teamLeader(DefaultTeam, c3.uniqueAddress) should ===(Some(c3.uniqueAddress)) + Gossip(members = SortedSet(c2, e2)).dcLeader(DefaultDataCenter, c2.uniqueAddress) should ===(Some(c2.uniqueAddress)) + Gossip(members = SortedSet(c3, e2)).dcLeader(DefaultDataCenter, c3.uniqueAddress) should ===(Some(e2.uniqueAddress)) + Gossip(members = SortedSet(c3)).dcLeader(DefaultDataCenter, c3.uniqueAddress) should ===(Some(c3.uniqueAddress)) } "have leader as first reachable member based on ordering" in { val r1 = Reachability.empty.unreachable(e2.uniqueAddress, c2.uniqueAddress) val g1 = Gossip(members = SortedSet(c2, e2), overview = GossipOverview(reachability = r1)) - g1.teamLeader(DefaultTeam, e2.uniqueAddress) should ===(Some(e2.uniqueAddress)) + g1.dcLeader(DefaultDataCenter, e2.uniqueAddress) should ===(Some(e2.uniqueAddress)) // but when c2 is selfUniqueAddress - g1.teamLeader(DefaultTeam, c2.uniqueAddress) should ===(Some(c2.uniqueAddress)) + g1.dcLeader(DefaultDataCenter, c2.uniqueAddress) should ===(Some(c2.uniqueAddress)) } "not have Down member as leader" in { - Gossip(members = SortedSet(e3)).teamLeader(DefaultTeam, e3.uniqueAddress) should ===(None) + Gossip(members = SortedSet(e3)).dcLeader(DefaultDataCenter, e3.uniqueAddress) should ===(None) } - "have a leader per team" in { + "have a leader per data center" in { val g1 = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1)) // everybodys point of view is dc1a1 being leader of dc1 - g1.teamLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) - g1.teamLeader("dc1", dc1b1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) - g1.teamLeader("dc1", dc2c1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) - g1.teamLeader("dc1", dc2d1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) + g1.dcLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) + g1.dcLeader("dc1", dc1b1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) + g1.dcLeader("dc1", dc2c1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) + g1.dcLeader("dc1", dc2d1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) // and dc2c1 being leader of dc2 - g1.teamLeader("dc2", dc1a1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) - g1.teamLeader("dc2", dc1b1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) - g1.teamLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) - g1.teamLeader("dc2", dc2d1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) + g1.dcLeader("dc2", dc1a1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) + g1.dcLeader("dc2", dc1b1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) + g1.dcLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) + g1.dcLeader("dc2", dc2d1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) } "merge seen table correctly" in { @@ -213,20 +212,20 @@ class GossipSpec extends WordSpec with Matchers { g3.youngestMember should ===(e2) } - "reach convergence per team" in { + "reach convergence per data center" in { val g = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1)) .seen(dc1a1.uniqueAddress) .seen(dc1b1.uniqueAddress) .seen(dc2c1.uniqueAddress) .seen(dc2d1.uniqueAddress) - g.teamLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) + g.dcLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) g.convergence("dc1", dc1a1.uniqueAddress, Set.empty) should ===(true) - g.teamLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) + g.dcLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) g.convergence("dc2", dc2c1.uniqueAddress, Set.empty) should ===(true) } - "reach convergence per team even if members of another team has not seen the gossip" in { + "reach convergence per data center even if members of another data center has not seen the gossip" in { val g = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1)) .seen(dc1a1.uniqueAddress) .seen(dc1b1.uniqueAddress) @@ -234,15 +233,15 @@ class GossipSpec extends WordSpec with Matchers { // dc2d1 has not seen the gossip // so dc1 can reach convergence - g.teamLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) + g.dcLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) g.convergence("dc1", dc1a1.uniqueAddress, Set.empty) should ===(true) // but dc2 cannot - g.teamLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) + g.dcLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) g.convergence("dc2", dc2c1.uniqueAddress, Set.empty) should ===(false) } - "reach convergence per team even if another team contains unreachable" in { + "reach convergence per data center even if another data center contains unreachable" in { val r1 = Reachability.empty.unreachable(dc2c1.uniqueAddress, dc2d1.uniqueAddress) val g = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1), overview = GossipOverview(reachability = r1)) @@ -251,16 +250,16 @@ class GossipSpec extends WordSpec with Matchers { .seen(dc2c1.uniqueAddress) .seen(dc2d1.uniqueAddress) - // this team doesn't care about dc2 having reachability problems and can reach convergence - g.teamLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) + // this data center doesn't care about dc2 having reachability problems and can reach convergence + g.dcLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) g.convergence("dc1", dc1a1.uniqueAddress, Set.empty) should ===(true) - // this team is cannot reach convergence because of unreachability within the team - g.teamLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) + // this data center is cannot reach convergence because of unreachability within the data center + g.dcLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) g.convergence("dc2", dc2c1.uniqueAddress, Set.empty) should ===(false) } - "reach convergence per team even if there is unreachable nodes in another team" in { + "reach convergence per data center even if there is unreachable nodes in another data center" in { val r1 = Reachability.empty .unreachable(dc1a1.uniqueAddress, dc2d1.uniqueAddress) .unreachable(dc2d1.uniqueAddress, dc1a1.uniqueAddress) @@ -271,33 +270,33 @@ class GossipSpec extends WordSpec with Matchers { .seen(dc2c1.uniqueAddress) .seen(dc2d1.uniqueAddress) - // neither team is affected by the inter-team unreachability as far as convergence goes - g.teamLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) + // neither data center is affected by the inter data center unreachability as far as convergence goes + g.dcLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress)) g.convergence("dc1", dc1a1.uniqueAddress, Set.empty) should ===(true) - g.teamLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) + g.dcLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress)) g.convergence("dc2", dc2c1.uniqueAddress, Set.empty) should ===(true) } - "ignore cross team unreachability when determining inside of team reachability" in { + "ignore cross data center unreachability when determining inside of data center reachability" in { val r1 = Reachability.empty .unreachable(dc1a1.uniqueAddress, dc2c1.uniqueAddress) .unreachable(dc2c1.uniqueAddress, dc1a1.uniqueAddress) val g = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1), overview = GossipOverview(reachability = r1)) - // inside of the teams we don't care about the cross team unreachability + // inside of the data center we don't care about the cross data center unreachability g.isReachable(dc1a1.uniqueAddress, dc1b1.uniqueAddress) should ===(true) g.isReachable(dc1b1.uniqueAddress, dc1a1.uniqueAddress) should ===(true) g.isReachable(dc2c1.uniqueAddress, dc2d1.uniqueAddress) should ===(true) g.isReachable(dc2d1.uniqueAddress, dc2c1.uniqueAddress) should ===(true) - g.isReachableExcludingDownedObservers(dc1a1.team, dc1b1.uniqueAddress) should ===(true) - g.isReachableExcludingDownedObservers(dc1b1.team, dc1a1.uniqueAddress) should ===(true) - g.isReachableExcludingDownedObservers(dc2c1.team, dc2d1.uniqueAddress) should ===(true) - g.isReachableExcludingDownedObservers(dc2d1.team, dc2c1.uniqueAddress) should ===(true) + g.isReachableExcludingDownedObservers(dc1a1.dataCenter, dc1b1.uniqueAddress) should ===(true) + g.isReachableExcludingDownedObservers(dc1b1.dataCenter, dc1a1.uniqueAddress) should ===(true) + g.isReachableExcludingDownedObservers(dc2c1.dataCenter, dc2d1.uniqueAddress) should ===(true) + g.isReachableExcludingDownedObservers(dc2d1.dataCenter, dc2c1.uniqueAddress) should ===(true) - // between teams it matters though + // between data centers it matters though g.isReachable(dc1a1.uniqueAddress, dc2c1.uniqueAddress) should ===(false) g.isReachable(dc2c1.uniqueAddress, dc1a1.uniqueAddress) should ===(false) // this isReachable method only says false for specific unreachable entries between the nodes @@ -305,25 +304,25 @@ class GossipSpec extends WordSpec with Matchers { g.isReachable(dc2d1.uniqueAddress, dc1a1.uniqueAddress) should ===(true) // this one looks at all unreachable-entries for the to-address - g.isReachableExcludingDownedObservers(dc1a1.team, dc2c1.uniqueAddress) should ===(false) - g.isReachableExcludingDownedObservers(dc1b1.team, dc2c1.uniqueAddress) should ===(false) - g.isReachableExcludingDownedObservers(dc2c1.team, dc1a1.uniqueAddress) should ===(false) - g.isReachableExcludingDownedObservers(dc2d1.team, dc1a1.uniqueAddress) should ===(false) + g.isReachableExcludingDownedObservers(dc1a1.dataCenter, dc2c1.uniqueAddress) should ===(false) + g.isReachableExcludingDownedObservers(dc1b1.dataCenter, dc2c1.uniqueAddress) should ===(false) + g.isReachableExcludingDownedObservers(dc2c1.dataCenter, dc1a1.uniqueAddress) should ===(false) + g.isReachableExcludingDownedObservers(dc2d1.dataCenter, dc1a1.uniqueAddress) should ===(false) // between the two other nodes there is no unreachability g.isReachable(dc1b1.uniqueAddress, dc2d1.uniqueAddress) should ===(true) g.isReachable(dc2d1.uniqueAddress, dc1b1.uniqueAddress) should ===(true) - g.isReachableExcludingDownedObservers(dc1b1.team, dc2d1.uniqueAddress) should ===(true) - g.isReachableExcludingDownedObservers(dc2d1.team, dc1b1.uniqueAddress) should ===(true) + g.isReachableExcludingDownedObservers(dc1b1.dataCenter, dc2d1.uniqueAddress) should ===(true) + g.isReachableExcludingDownedObservers(dc2d1.dataCenter, dc1b1.uniqueAddress) should ===(true) } - "not returning a downed team leader" in { + "not returning a downed data center leader" in { val g = Gossip(members = SortedSet(dc1a1.copy(Down), dc1b1)) g.leaderOf("dc1", g.members, dc1b1.uniqueAddress) should ===(Some(dc1b1.uniqueAddress)) } - "ignore cross team unreachability when determining team leader" in { + "ignore cross data center unreachability when determining data center leader" in { val r1 = Reachability.empty .unreachable(dc1a1.uniqueAddress, dc2d1.uniqueAddress) .unreachable(dc2d1.uniqueAddress, dc1a1.uniqueAddress) @@ -356,7 +355,7 @@ class GossipSpec extends WordSpec with Matchers { g.members.toList should ===(List(dc1a1, dc2d2)) } - "not reintroduce members from out-of-team gossip when merging" in { + "not reintroduce members from out-of data center gossip when merging" in { // dc1 does not know about any unreachability nor that the node has been downed val gdc1 = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1)) @@ -408,7 +407,7 @@ class GossipSpec extends WordSpec with Matchers { } "update members" in { - val joining = TestMember(Address("akka.tcp", "sys", "d", 2552), Joining, Set.empty, team = "dc2") + val joining = TestMember(Address("akka.tcp", "sys", "d", 2552), Joining, Set.empty, dataCenter = "dc2") val g = Gossip(members = SortedSet(dc1a1, joining)) g.member(joining.uniqueAddress).status should ===(Joining) diff --git a/akka-cluster/src/test/scala/akka/cluster/TestMember.scala b/akka-cluster/src/test/scala/akka/cluster/TestMember.scala index 028a727f33..58b33a395f 100644 --- a/akka-cluster/src/test/scala/akka/cluster/TestMember.scala +++ b/akka-cluster/src/test/scala/akka/cluster/TestMember.scala @@ -9,6 +9,6 @@ object TestMember { def apply(address: Address, status: MemberStatus): Member = apply(address, status, Set.empty) - def apply(address: Address, status: MemberStatus, roles: Set[String], team: ClusterSettings.Team = ClusterSettings.DefaultTeam): Member = - new Member(UniqueAddress(address, 0L), Int.MaxValue, status, roles + (ClusterSettings.TeamRolePrefix + team)) + def apply(address: Address, status: MemberStatus, roles: Set[String], dataCenter: ClusterSettings.DataCenter = ClusterSettings.DefaultDataCenter): Member = + new Member(UniqueAddress(address, 0L), Int.MaxValue, status, roles + (ClusterSettings.DcRolePrefix + dataCenter)) } diff --git a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala index fd4c323309..8c7a174751 100644 --- a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala @@ -80,25 +80,22 @@ class ClusterMessageSerializerSpec extends AkkaSpec( checkSerialization(InternalClusterAction.Welcome(uniqueAddress, g2)) } - "add a default team role if none is present" in { + "add a default data center role if none is present" in { val env = roundtrip(GossipEnvelope(a1.uniqueAddress, d1.uniqueAddress, Gossip(SortedSet(a1, d1)))) - env.gossip.members.head.roles should be(Set(ClusterSettings.TeamRolePrefix + "default")) - env.gossip.members.tail.head.roles should be(Set("r1", ClusterSettings.TeamRolePrefix + "foo")) + env.gossip.members.head.roles should be(Set(ClusterSettings.DcRolePrefix + "default")) + env.gossip.members.tail.head.roles should be(Set("r1", ClusterSettings.DcRolePrefix + "foo")) } } "Cluster router pool" must { "be serializable" in { checkSerialization(ClusterRouterPool( RoundRobinPool( - nrOfInstances = 4 - ), + nrOfInstances = 4), ClusterRouterPoolSettings( totalInstances = 2, maxInstancesPerNode = 5, allowLocalRoutees = true, - useRole = Some("Richard, Duke of Gloucester") - ) - )) + useRole = Some("Richard, Duke of Gloucester")))) } } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index 151ef0a5ed..ca79c9be89 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -1134,7 +1134,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog override def preStart(): Unit = { if (hasDurableKeys) durableStore ! LoadAll - // not using LeaderChanged/RoleLeaderChanged because here we need one node independent of team + // not using LeaderChanged/RoleLeaderChanged because here we need one node independent of data center cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[ReachabilityEvent]) } diff --git a/project/MiMa.scala b/project/MiMa.scala index 5c55c43c01..73bf9c5897 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -1239,7 +1239,7 @@ object MiMa extends AutoPlugin { // older versions will be missing the method. We accept that incompatibility for now. ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.transport.AssociationHandle.disassociate"), - // #23228 single leader per cluster team + // #23228 single leader per cluster data center ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.apply"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.copy"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.this"),