Rename team to data center, #23275

This commit is contained in:
Patrik Nordwall 2017-07-04 17:11:21 +02:00
parent e0fe0bc49e
commit bb9549263e
28 changed files with 382 additions and 385 deletions

View file

@ -30,6 +30,7 @@ import akka.cluster.ddata.Replicator
import scala.util.control.NonFatal
import akka.actor.Status
import akka.cluster.ClusterSettings
import akka.cluster.ClusterSettings.DataCenter
/**
* This extension provides sharding functionality of actors in a cluster.
@ -343,7 +344,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
role: Option[String],
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId): ActorRef =
startProxy(typeName, role, team = None, extractEntityId, extractShardId)
startProxy(typeName, role, dataCenter = None, extractEntityId, extractShardId)
/**
* Scala API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode,
@ -357,8 +358,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
* @param typeName the name of the entity type
* @param role specifies that this entity type is located on cluster nodes with a specific role.
* If the role is not specified all nodes in the cluster are used.
* @param team The team of the cluster nodes where the cluster sharding is running.
* If None then the same team as current node.
* @param dataCenter The data center of the cluster nodes where the cluster sharding is running.
* If None then the same data center as current node.
* @param extractEntityId partial function to extract the entity id and the message to send to the
* entity from the incoming message, if the partial function does not match the message will
* be `unhandled`, i.e. posted as `Unhandled` messages on the event stream
@ -369,21 +370,21 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
def startProxy(
typeName: String,
role: Option[String],
team: Option[String],
dataCenter: Option[DataCenter],
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId): ActorRef = {
implicit val timeout = system.settings.CreationTimeout
val settings = ClusterShardingSettings(system).withRole(role)
val startMsg = StartProxy(typeName, team, settings, extractEntityId, extractShardId)
val startMsg = StartProxy(typeName, dataCenter, settings, extractEntityId, extractShardId)
val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration)
// it must be possible to start several proxies, one per team
regions.put(proxyName(typeName, team), shardRegion)
// it must be possible to start several proxies, one per data center
regions.put(proxyName(typeName, dataCenter), shardRegion)
shardRegion
}
private def proxyName(typeName: String, team: Option[String]): String = {
team match {
private def proxyName(typeName: String, dataCenter: Option[DataCenter]): String = {
dataCenter match {
case None typeName
case Some(t) typeName + "-" + t
}
@ -409,7 +410,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
typeName: String,
role: Optional[String],
messageExtractor: ShardRegion.MessageExtractor): ActorRef =
startProxy(typeName, role, team = Optional.empty(), messageExtractor)
startProxy(typeName, role, dataCenter = Optional.empty(), messageExtractor)
/**
* Java/Scala API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode,
@ -423,8 +424,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
* @param typeName the name of the entity type
* @param role specifies that this entity type is located on cluster nodes with a specific role.
* If the role is not specified all nodes in the cluster are used.
* @param team The team of the cluster nodes where the cluster sharding is running.
* If None then the same team as current node.
* @param dataCenter The data center of the cluster nodes where the cluster sharding is running.
* If None then the same data center as current node.
* @param messageExtractor functions to extract the entity id, shard id, and the message to send to the
* entity from the incoming message
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
@ -432,10 +433,10 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
def startProxy(
typeName: String,
role: Optional[String],
team: Optional[String],
dataCenter: Optional[String],
messageExtractor: ShardRegion.MessageExtractor): ActorRef = {
startProxy(typeName, Option(role.orElse(null)), Option(team.orElse(null)),
startProxy(typeName, Option(role.orElse(null)), Option(dataCenter.orElse(null)),
extractEntityId = {
case msg if messageExtractor.entityId(msg) ne null
(messageExtractor.entityId(msg), messageExtractor.entityMessage(msg))
@ -456,13 +457,13 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
/**
* Retrieve the actor reference of the [[ShardRegion]] actor that will act as a proxy to the
* named entity type running in another team. A proxy within the same team can be accessed
* named entity type running in another data center. A proxy within the same data center can be accessed
* with [[#shardRegion]] instead of this method. The entity type must be registered with the
* [[#startProxy]] method before it can be used here. Messages to the entity is always sent
* via the `ShardRegion`.
*/
def shardRegionProxy(typeName: String, team: String): ActorRef = {
regions.get(proxyName(typeName, Some(team))) match {
def shardRegionProxy(typeName: String, dataCenter: DataCenter): ActorRef = {
regions.get(proxyName(typeName, Some(dataCenter))) match {
case null throw new IllegalArgumentException(s"Shard type [$typeName] must be started first")
case ref ref
}
@ -479,7 +480,7 @@ private[akka] object ClusterShardingGuardian {
extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId,
allocationStrategy: ShardAllocationStrategy, handOffStopMessage: Any)
extends NoSerializationVerificationNeeded
final case class StartProxy(typeName: String, team: Option[String], settings: ClusterShardingSettings,
final case class StartProxy(typeName: String, dataCenter: Option[DataCenter], settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId)
extends NoSerializationVerificationNeeded
final case class Started(shardRegion: ActorRef) extends NoSerializationVerificationNeeded
@ -518,8 +519,8 @@ private[akka] class ClusterShardingGuardian extends Actor {
case Some(r) URLEncoder.encode(r, ByteString.UTF_8) + "Replicator"
case None "replicator"
}
// Use members within the team and with the given role (if any)
val replicatorRoles = Set(ClusterSettings.TeamRolePrefix + cluster.settings.Team) ++ settings.role
// Use members within the data center and with the given role (if any)
val replicatorRoles = Set(ClusterSettings.DcRolePrefix + cluster.settings.DataCenter) ++ settings.role
val ref = context.actorOf(Replicator.props(replicatorSettings.withRoles(replicatorRoles)), name)
replicatorByRole = replicatorByRole.updated(settings.role, ref)
ref
@ -584,14 +585,14 @@ private[akka] class ClusterShardingGuardian extends Actor {
sender() ! Status.Failure(e)
}
case StartProxy(typeName, team, settings, extractEntityId, extractShardId)
case StartProxy(typeName, dataCenter, settings, extractEntityId, extractShardId)
try {
val encName = URLEncoder.encode(typeName, ByteString.UTF_8)
val cName = coordinatorSingletonManagerName(encName)
val cPath = coordinatorPath(encName)
// it must be possible to start several proxies, one per team
val actorName = team match {
// it must be possible to start several proxies, one per data center
val actorName = dataCenter match {
case None encName
case Some(t) URLEncoder.encode(typeName + "-" + t, ByteString.UTF_8)
}
@ -599,7 +600,7 @@ private[akka] class ClusterShardingGuardian extends Actor {
context.actorOf(
ShardRegion.proxyProps(
typeName = typeName,
team = team,
dataCenter = dataCenter,
settings = settings,
coordinatorPath = cPath,
extractEntityId = extractEntityId,

View file

@ -21,6 +21,7 @@ import scala.reflect.ClassTag
import scala.concurrent.Promise
import akka.Done
import akka.cluster.ClusterSettings
import akka.cluster.ClusterSettings.DataCenter
/**
* @see [[ClusterSharding$ ClusterSharding extension]]
@ -41,7 +42,7 @@ object ShardRegion {
handOffStopMessage: Any,
replicator: ActorRef,
majorityMinCap: Int): Props =
Props(new ShardRegion(typeName, Some(entityProps), team = None, settings, coordinatorPath, extractEntityId,
Props(new ShardRegion(typeName, Some(entityProps), dataCenter = None, settings, coordinatorPath, extractEntityId,
extractShardId, handOffStopMessage, replicator, majorityMinCap)).withDeploy(Deploy.local)
/**
@ -51,14 +52,14 @@ object ShardRegion {
*/
private[akka] def proxyProps(
typeName: String,
team: Option[String],
dataCenter: Option[DataCenter],
settings: ClusterShardingSettings,
coordinatorPath: String,
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId,
replicator: ActorRef,
majorityMinCap: Int): Props =
Props(new ShardRegion(typeName, None, team, settings, coordinatorPath, extractEntityId, extractShardId,
Props(new ShardRegion(typeName, None, dataCenter, settings, coordinatorPath, extractEntityId, extractShardId,
PoisonPill, replicator, majorityMinCap)).withDeploy(Deploy.local)
/**
@ -367,7 +368,7 @@ object ShardRegion {
private[akka] class ShardRegion(
typeName: String,
entityProps: Option[Props],
team: Option[String],
dataCenter: Option[DataCenter],
settings: ClusterShardingSettings,
coordinatorPath: String,
extractEntityId: ShardRegion.ExtractEntityId,
@ -422,14 +423,14 @@ private[akka] class ShardRegion(
retryTask.cancel()
}
// when using proxy the team can be different that the own team
private val targetTeamRole = team match {
case Some(t) ClusterSettings.TeamRolePrefix + t
case None ClusterSettings.TeamRolePrefix + cluster.settings.Team
// when using proxy the data center can be different from the own data center
private val targetDcRole = dataCenter match {
case Some(t) ClusterSettings.DcRolePrefix + t
case None ClusterSettings.DcRolePrefix + cluster.settings.DataCenter
}
def matchingRole(member: Member): Boolean =
member.hasRole(targetTeamRole) && role.forall(member.hasRole)
member.hasRole(targetDcRole) && role.forall(member.hasRole)
def coordinatorSelection: Option[ActorSelection] =
membersByAge.headOption.map(m context.actorSelection(RootActorPath(m.address) + coordinatorPath))

View file

@ -460,7 +460,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
val proxy = system.actorOf(
ShardRegion.proxyProps(
typeName = "counter",
team = None,
dataCenter = None,
settings,
coordinatorPath = "/user/counterCoordinator/singleton/coordinator",
extractEntityId = extractEntityId,

View file

@ -21,7 +21,7 @@ import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
object TeamClusterShardingSpec {
object MultiDcClusterShardingSpec {
sealed trait EntityMsg {
def id: String
}
@ -48,7 +48,7 @@ object TeamClusterShardingSpec {
}
}
object TeamClusterShardingSpecConfig extends MultiNodeConfig {
object MultiDcClusterShardingSpecConfig extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
@ -62,23 +62,23 @@ object TeamClusterShardingSpecConfig extends MultiNodeConfig {
"""))
nodeConfig(first, second) {
ConfigFactory.parseString("akka.cluster.team = DC1")
ConfigFactory.parseString("akka.cluster.data-center = DC1")
}
nodeConfig(third, fourth) {
ConfigFactory.parseString("akka.cluster.team = DC2")
ConfigFactory.parseString("akka.cluster.data-center = DC2")
}
}
class TeamClusterShardingMultiJvmNode1 extends TeamClusterShardingSpec
class TeamClusterShardingMultiJvmNode2 extends TeamClusterShardingSpec
class TeamClusterShardingMultiJvmNode3 extends TeamClusterShardingSpec
class TeamClusterShardingMultiJvmNode4 extends TeamClusterShardingSpec
class MultiDcClusterShardingMultiJvmNode1 extends MultiDcClusterShardingSpec
class MultiDcClusterShardingMultiJvmNode2 extends MultiDcClusterShardingSpec
class MultiDcClusterShardingMultiJvmNode3 extends MultiDcClusterShardingSpec
class MultiDcClusterShardingMultiJvmNode4 extends MultiDcClusterShardingSpec
abstract class TeamClusterShardingSpec extends MultiNodeSpec(TeamClusterShardingSpecConfig)
abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterShardingSpecConfig)
with STMultiNodeSpec with ImplicitSender {
import TeamClusterShardingSpec._
import TeamClusterShardingSpecConfig._
import MultiDcClusterShardingSpec._
import MultiDcClusterShardingSpecConfig._
override def initialParticipants = roles.size
@ -119,7 +119,7 @@ abstract class TeamClusterShardingSpec extends MultiNodeSpec(TeamClusterSharding
}, 10.seconds)
}
s"Cluster sharding with teams" must {
s"Cluster sharding in multi data center cluster" must {
"join cluster" in within(20.seconds) {
join(first, first)
join(second, first)
@ -171,7 +171,7 @@ abstract class TeamClusterShardingSpec extends MultiNodeSpec(TeamClusterSharding
enterBarrier("after-2")
}
"not mix entities in different teams" in {
"not mix entities in different data centers" in {
runOn(second) {
region ! GetCount("5")
expectMsg(1)
@ -183,12 +183,12 @@ abstract class TeamClusterShardingSpec extends MultiNodeSpec(TeamClusterSharding
enterBarrier("after-3")
}
"allow proxy within same team" in {
"allow proxy within same data center" in {
runOn(second) {
val proxy = ClusterSharding(system).startProxy(
typeName = "Entity",
role = None,
team = None, // by default use own team
dataCenter = None, // by default use own DC
extractEntityId = extractEntityId,
extractShardId = extractShardId)
@ -198,12 +198,12 @@ abstract class TeamClusterShardingSpec extends MultiNodeSpec(TeamClusterSharding
enterBarrier("after-4")
}
"allow proxy across different teams" in {
"allow proxy across different data centers" in {
runOn(second) {
val proxy = ClusterSharding(system).startProxy(
typeName = "Entity",
role = None,
team = Some("DC2"), // proxy to other DC
dataCenter = Some("DC2"), // proxy to other DC
extractEntityId = extractEntityId,
extractShardId = extractShardId)

View file

@ -257,10 +257,10 @@ object ClusterSingletonManager {
}
override def postStop(): Unit = cluster.unsubscribe(self)
private val selfTeam = ClusterSettings.TeamRolePrefix + cluster.settings.Team
private val selfDc = ClusterSettings.DcRolePrefix + cluster.settings.DataCenter
def matchingRole(member: Member): Boolean =
member.hasRole(selfTeam) && role.forall(member.hasRole)
member.hasRole(selfDc) && role.forall(member.hasRole)
def trackChange(block: () Unit): Unit = {
val before = membersByAge.headOption

View file

@ -20,6 +20,7 @@ import akka.actor.NoSerializationVerificationNeeded
import akka.event.Logging
import akka.util.MessageBuffer
import akka.cluster.ClusterSettings
import akka.cluster.ClusterSettings.DataCenter
object ClusterSingletonProxySettings {
@ -64,7 +65,7 @@ object ClusterSingletonProxySettings {
/**
* @param singletonName The actor name of the singleton actor that is started by the [[ClusterSingletonManager]].
* @param role The role of the cluster nodes where the singleton can be deployed. If None, then any node will do.
* @param team The team of the cluster nodes where the singleton is running. If None then the same team as current node.
* @param dataCenter The data center of the cluster nodes where the singleton is running. If None then the same data center as current node.
* @param singletonIdentificationInterval Interval at which the proxy will try to resolve the singleton instance.
* @param bufferSize If the location of the singleton is unknown the proxy will buffer this number of messages
* and deliver them when the singleton is identified. When the buffer is full old messages will be dropped
@ -74,7 +75,7 @@ object ClusterSingletonProxySettings {
final class ClusterSingletonProxySettings(
val singletonName: String,
val role: Option[String],
val team: Option[String],
val dataCenter: Option[DataCenter],
val singletonIdentificationInterval: FiniteDuration,
val bufferSize: Int) extends NoSerializationVerificationNeeded {
@ -94,7 +95,7 @@ final class ClusterSingletonProxySettings(
def withRole(role: Option[String]): ClusterSingletonProxySettings = copy(role = role)
def withTeam(team: String): ClusterSingletonProxySettings = copy(team = Some(team))
def withDataCenter(dataCenter: DataCenter): ClusterSingletonProxySettings = copy(dataCenter = Some(dataCenter))
def withSingletonIdentificationInterval(singletonIdentificationInterval: FiniteDuration): ClusterSingletonProxySettings =
copy(singletonIdentificationInterval = singletonIdentificationInterval)
@ -105,10 +106,10 @@ final class ClusterSingletonProxySettings(
private def copy(
singletonName: String = singletonName,
role: Option[String] = role,
team: Option[String] = team,
dataCenter: Option[DataCenter] = dataCenter,
singletonIdentificationInterval: FiniteDuration = singletonIdentificationInterval,
bufferSize: Int = bufferSize): ClusterSingletonProxySettings =
new ClusterSingletonProxySettings(singletonName, role, team, singletonIdentificationInterval, bufferSize)
new ClusterSingletonProxySettings(singletonName, role, dataCenter, singletonIdentificationInterval, bufferSize)
}
object ClusterSingletonProxy {
@ -176,13 +177,13 @@ final class ClusterSingletonProxy(singletonManagerPath: String, settings: Cluste
identifyTimer = None
}
private val targetTeamRole = settings.team match {
case Some(t) ClusterSettings.TeamRolePrefix + t
case None ClusterSettings.TeamRolePrefix + cluster.settings.Team
private val targetDcRole = settings.dataCenter match {
case Some(t) ClusterSettings.DcRolePrefix + t
case None ClusterSettings.DcRolePrefix + cluster.settings.DataCenter
}
def matchingRole(member: Member): Boolean =
member.hasRole(targetTeamRole) && role.forall(member.hasRole)
member.hasRole(targetDcRole) && role.forall(member.hasRole)
def handleInitial(state: CurrentClusterState): Unit = {
trackChange {

View file

@ -14,7 +14,7 @@ import akka.testkit.ImplicitSender
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
import akka.cluster.ClusterSettings
object TeamSingletonManagerSpec extends MultiNodeConfig {
object MultiDcSingletonManagerSpec extends MultiNodeConfig {
val controller = role("controller")
val first = role("first")
val second = role("second")
@ -28,44 +28,44 @@ object TeamSingletonManagerSpec extends MultiNodeConfig {
nodeConfig(controller) {
ConfigFactory.parseString("""
akka.cluster.team = one
akka.cluster.data-center = one
akka.cluster.roles = []""")
}
nodeConfig(first) {
ConfigFactory.parseString("""
akka.cluster.team = one
akka.cluster.data-center = one
akka.cluster.roles = [ worker ]""")
}
nodeConfig(second, third) {
ConfigFactory.parseString("""
akka.cluster.team = two
akka.cluster.data-center = two
akka.cluster.roles = [ worker ]""")
}
}
class TeamSingletonManagerMultiJvmNode1 extends TeamSingletonManagerSpec
class TeamSingletonManagerMultiJvmNode2 extends TeamSingletonManagerSpec
class TeamSingletonManagerMultiJvmNode3 extends TeamSingletonManagerSpec
class TeamSingletonManagerMultiJvmNode4 extends TeamSingletonManagerSpec
class MultiDcSingletonManagerMultiJvmNode1 extends MultiDcSingletonManagerSpec
class MultiDcSingletonManagerMultiJvmNode2 extends MultiDcSingletonManagerSpec
class MultiDcSingletonManagerMultiJvmNode3 extends MultiDcSingletonManagerSpec
class MultiDcSingletonManagerMultiJvmNode4 extends MultiDcSingletonManagerSpec
class TeamSingleton extends Actor with ActorLogging {
import TeamSingleton._
class MultiDcSingleton extends Actor with ActorLogging {
import MultiDcSingleton._
val cluster = Cluster(context.system)
override def receive: Receive = {
case Ping
sender() ! Pong(cluster.settings.Team, cluster.selfAddress, cluster.selfRoles)
sender() ! Pong(cluster.settings.DataCenter, cluster.selfAddress, cluster.selfRoles)
}
}
object TeamSingleton {
object MultiDcSingleton {
case object Ping
case class Pong(fromTeam: String, fromAddress: Address, roles: Set[String])
case class Pong(fromDc: String, fromAddress: Address, roles: Set[String])
}
abstract class TeamSingletonManagerSpec extends MultiNodeSpec(TeamSingletonManagerSpec) with STMultiNodeSpec with ImplicitSender {
import TeamSingletonManagerSpec._
abstract class MultiDcSingletonManagerSpec extends MultiNodeSpec(MultiDcSingletonManagerSpec) with STMultiNodeSpec with ImplicitSender {
import MultiDcSingletonManagerSpec._
override def initialParticipants = roles.size
@ -75,13 +75,13 @@ abstract class TeamSingletonManagerSpec extends MultiNodeSpec(TeamSingletonManag
val worker = "worker"
"A SingletonManager in a team" must {
"start a singleton instance for each team" in {
"A SingletonManager in a multi data center cluster" must {
"start a singleton instance for each data center" in {
runOn(first, second, third) {
system.actorOf(
ClusterSingletonManager.props(
Props[TeamSingleton](),
Props[MultiDcSingleton](),
PoisonPill,
ClusterSingletonManagerSettings(system).withRole(worker)),
"singletonManager")
@ -93,33 +93,33 @@ abstract class TeamSingletonManagerSpec extends MultiNodeSpec(TeamSingletonManag
enterBarrier("managers-started")
proxy ! TeamSingleton.Ping
val pong = expectMsgType[TeamSingleton.Pong](10.seconds)
proxy ! MultiDcSingleton.Ping
val pong = expectMsgType[MultiDcSingleton.Pong](10.seconds)
enterBarrier("pongs-received")
pong.fromTeam should equal(Cluster(system).settings.Team)
pong.fromDc should equal(Cluster(system).settings.DataCenter)
pong.roles should contain(worker)
runOn(controller, first) {
pong.roles should contain(ClusterSettings.TeamRolePrefix + "one")
pong.roles should contain(ClusterSettings.DcRolePrefix + "one")
}
runOn(second, third) {
pong.roles should contain(ClusterSettings.TeamRolePrefix + "two")
pong.roles should contain(ClusterSettings.DcRolePrefix + "two")
}
enterBarrier("after-1")
}
"be able to use proxy across different team" in {
"be able to use proxy across different data centers" in {
runOn(third) {
val proxy = system.actorOf(ClusterSingletonProxy.props(
"/user/singletonManager",
ClusterSingletonProxySettings(system).withRole(worker).withTeam("one")))
proxy ! TeamSingleton.Ping
val pong = expectMsgType[TeamSingleton.Pong](10.seconds)
pong.fromTeam should ===("one")
ClusterSingletonProxySettings(system).withRole(worker).withDataCenter("one")))
proxy ! MultiDcSingleton.Ping
val pong = expectMsgType[MultiDcSingleton.Pong](10.seconds)
pong.fromDc should ===("one")
pong.roles should contain(worker)
pong.roles should contain(ClusterSettings.TeamRolePrefix + "one")
pong.roles should contain(ClusterSettings.DcRolePrefix + "one")
}
enterBarrier("after-1")
}

View file

@ -65,17 +65,18 @@ akka {
# move 'WeaklyUp' members to 'Up' status once convergence has been reached.
allow-weakly-up-members = on
# Teams are used to make islands of the cluster that are colocated. This can be used
# to make the cluster aware that it is running across multiple availability zones or regions.
# The team is added to the list of roles of the node with the prefix "team-".
team = "default"
# Defines which data center this node belongs to. It is typically used to make islands of the
# cluster that are colocated. This can be used to make the cluster aware that it is running
# across multiple availability zones or regions. It can also be used for other logical
# grouping of nodes.
data-center = "default"
# The roles of this member. List of strings, e.g. roles = ["A", "B"].
# The roles are part of the membership information and can be used by
# routers or other services to distribute work to certain member types,
# e.g. front-end and back-end nodes.
# Roles are not allowed to start with "team-" as that is reserved for the
# special role assigned from the team a node belongs to (see above)
# Roles are not allowed to start with "dc-" as that is reserved for the
# special role assigned from the data-center a node belongs to (see above)
roles = []
# Run the coordinated shutdown from phase 'cluster-shutdown' when the cluster

View file

@ -421,31 +421,31 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
def logInfo(message: String): Unit =
if (LogInfo)
if (settings.Team == ClusterSettings.DefaultTeam)
if (settings.DataCenter == ClusterSettings.DefaultDataCenter)
log.info("Cluster Node [{}] - {}", selfAddress, message)
else
log.info("Cluster Node [{}] team [{}] - {}", selfAddress, settings.Team, message)
log.info("Cluster Node [{}] dc [{}] - {}", selfAddress, settings.DataCenter, message)
def logInfo(template: String, arg1: Any): Unit =
if (LogInfo)
if (settings.Team == ClusterSettings.DefaultTeam)
if (settings.DataCenter == ClusterSettings.DefaultDataCenter)
log.info("Cluster Node [{}] - " + template, selfAddress, arg1)
else
log.info("Cluster Node [{}] team [{}] - " + template, selfAddress, settings.Team, arg1)
log.info("Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.DataCenter, arg1)
def logInfo(template: String, arg1: Any, arg2: Any): Unit =
if (LogInfo)
if (settings.Team == ClusterSettings.DefaultTeam)
if (settings.DataCenter == ClusterSettings.DefaultDataCenter)
log.info("Cluster Node [{}] - " + template, selfAddress, arg1, arg2)
else
log.info("Cluster Node [{}] team [{}] - " + template, selfAddress, settings.Team, arg1, arg2)
log.info("Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.DataCenter, arg1, arg2)
def logInfo(template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
if (LogInfo)
if (settings.Team == ClusterSettings.DefaultTeam)
if (settings.DataCenter == ClusterSettings.DefaultDataCenter)
log.info("Cluster Node [{}] - " + template, selfAddress, arg1, arg2, arg3)
else
log.info("Cluster Node [{}] team [" + settings.Team + "] - " + template, selfAddress, arg1, arg2, arg3)
log.info("Cluster Node [{}] dc [" + settings.DataCenter + "] - " + template, selfAddress, arg1, arg2, arg3)
}
}

View file

@ -330,7 +330,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
}
var exitingConfirmed = Set.empty[UniqueAddress]
def selfTeam = cluster.settings.Team
def selfDc = cluster.settings.DataCenter
/**
* Looks up and returns the remote cluster command connection for the specific address.
@ -681,10 +681,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
// send ExitingConfirmed to two potential leaders
val membersExceptSelf = latestGossip.members.filter(_.uniqueAddress != selfUniqueAddress)
latestGossip.leaderOf(selfTeam, membersExceptSelf, selfUniqueAddress) match {
latestGossip.leaderOf(selfDc, membersExceptSelf, selfUniqueAddress) match {
case Some(node1)
clusterCore(node1.address) ! ExitingConfirmed(selfUniqueAddress)
latestGossip.leaderOf(selfTeam, membersExceptSelf.filterNot(_.uniqueAddress == node1), selfUniqueAddress) match {
latestGossip.leaderOf(selfDc, membersExceptSelf.filterNot(_.uniqueAddress == node1), selfUniqueAddress) match {
case Some(node2)
clusterCore(node2.address) ! ExitingConfirmed(selfUniqueAddress)
case None // no more potential leader
@ -723,7 +723,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
val localMembers = localGossip.members
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localReachability = localGossip.teamReachability(selfTeam)
val localReachability = localGossip.dcReachability(selfDc)
// check if the node to DOWN is in the `members` set
localMembers.find(_.address == address) match {
@ -1004,11 +1004,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
* Runs periodic leader actions, such as member status transitions, assigning partitions etc.
*/
def leaderActions(): Unit = {
if (latestGossip.isTeamLeader(selfTeam, selfUniqueAddress, selfUniqueAddress)) {
// only run the leader actions if we are the LEADER of the team
if (latestGossip.isDcLeader(selfDc, selfUniqueAddress, selfUniqueAddress)) {
// only run the leader actions if we are the LEADER of the data center
val firstNotice = 20
val periodicNotice = 60
if (latestGossip.convergence(selfTeam, selfUniqueAddress, exitingConfirmed)) {
if (latestGossip.convergence(selfDc, selfUniqueAddress, exitingConfirmed)) {
if (leaderActionCounter >= firstNotice)
logInfo("Leader can perform its duties again")
leaderActionCounter = 0
@ -1021,9 +1021,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (leaderActionCounter == firstNotice || leaderActionCounter % periodicNotice == 0)
logInfo(
"Leader can currently not perform its duties, reachability status: [{}], member status: [{}]",
latestGossip.teamReachabilityExcludingDownedObservers(selfTeam),
latestGossip.dcReachabilityExcludingDownedObservers(selfDc),
latestGossip.members.collect {
case m if m.team == selfTeam
case m if m.dataCenter == selfDc
s"${m.address} ${m.status} seen=${latestGossip.seenByNode(m.uniqueAddress)}"
}.mkString(", "))
}
@ -1036,8 +1036,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (latestGossip.member(selfUniqueAddress).status == Down) {
// When all reachable have seen the state this member will shutdown itself when it has
// status Down. The down commands should spread before we shutdown.
val unreachable = latestGossip.teamReachability(selfTeam).allUnreachableOrTerminated
val downed = latestGossip.teamMembers(selfTeam).collect { case m if m.status == Down m.uniqueAddress }
val unreachable = latestGossip.dcReachability(selfDc).allUnreachableOrTerminated
val downed = latestGossip.dcMembers(selfDc).collect { case m if m.status == Down m.uniqueAddress }
if (downed.forall(node unreachable(node) || latestGossip.seenByNode(node))) {
// the reason for not shutting down immediately is to give the gossip a chance to spread
// the downing information to other downed nodes, so that they can shutdown themselves
@ -1072,14 +1072,14 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def leaderActionsOnConvergence(): Unit = {
val removedUnreachable = for {
node latestGossip.teamReachability(selfTeam).allUnreachableOrTerminated
node latestGossip.dcReachability(selfDc).allUnreachableOrTerminated
m = latestGossip.member(node)
if m.team == selfTeam && Gossip.removeUnreachableWithMemberStatus(m.status)
if m.dataCenter == selfDc && Gossip.removeUnreachableWithMemberStatus(m.status)
} yield m
val removedExitingConfirmed = exitingConfirmed.filter { n
val member = latestGossip.member(n)
member.team == selfTeam && member.status == Exiting
member.dataCenter == selfDc && member.status == Exiting
}
val changedMembers = {
@ -1090,7 +1090,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
var upNumber = 0
{
case m if m.team == selfTeam && isJoiningToUp(m)
case m if m.dataCenter == selfDc && isJoiningToUp(m)
// Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence)
// and minimum number of nodes have joined the cluster
if (upNumber == 0) {
@ -1103,7 +1103,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
}
m.copyUp(upNumber)
case m if m.team == selfTeam && m.status == Leaving
case m if m.dataCenter == selfDc && m.status == Leaving
// Move LEAVING => EXITING (once we have a convergence on LEAVING)
m copy (status = Exiting)
}
@ -1158,10 +1158,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
val enoughMembers: Boolean = isMinNrOfMembersFulfilled
def isJoiningToWeaklyUp(m: Member): Boolean =
m.team == selfTeam &&
m.dataCenter == selfDc &&
m.status == Joining &&
enoughMembers &&
latestGossip.teamReachabilityExcludingDownedObservers(selfTeam).isReachable(m.uniqueAddress)
latestGossip.dcReachabilityExcludingDownedObservers(selfDc).isReachable(m.uniqueAddress)
val changedMembers = localMembers.collect {
case m if isJoiningToWeaklyUp(m) m.copy(status = WeaklyUp)
}
@ -1269,7 +1269,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
clusterCore(node.address) ! GossipStatus(selfUniqueAddress, latestGossip.version)
def validNodeForGossip(node: UniqueAddress): Boolean =
node != selfUniqueAddress && latestGossip.isReachableExcludingDownedObservers(selfTeam, node)
node != selfUniqueAddress && latestGossip.isReachableExcludingDownedObservers(selfDc, node)
def updateLatestGossip(newGossip: Gossip): Unit = {
// Updating the vclock version for the changes
@ -1295,7 +1295,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def publish(newGossip: Gossip): Unit = {
if (cluster.settings.Debug.VerboseGossipLogging)
log.debug("Cluster Node [{}] team [{}] - New gossip published [{}]", selfAddress, cluster.settings.Team, newGossip)
log.debug("Cluster Node [{}] dc [{}] - New gossip published [{}]", selfAddress, cluster.settings.DataCenter, newGossip)
publisher ! PublishChanges(newGossip)
if (PublishStatsInterval == Duration.Zero) publishInternalStats()

View file

@ -7,7 +7,7 @@ import language.postfixOps
import scala.collection.immutable
import scala.collection.immutable.VectorBuilder
import akka.actor.{ Actor, ActorLogging, ActorRef, Address }
import akka.cluster.ClusterSettings.Team
import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus._
import akka.event.EventStream
@ -58,7 +58,7 @@ object ClusterEvent {
/**
* Current snapshot state of the cluster. Sent to new subscriber.
*
* @param leader leader of the team of this node
* @param leader leader of the data center of this node
*/
final case class CurrentClusterState(
members: immutable.SortedSet[Member] = immutable.SortedSet.empty,
@ -88,17 +88,17 @@ object ClusterEvent {
scala.collection.JavaConverters.setAsJavaSetConverter(seenBy).asJava
/**
* Java API: get address of current team leader, or null if none
* Java API: get address of current data center leader, or null if none
*/
def getLeader: Address = leader orNull
/**
* get address of current leader, if any, within the team that has the given role
* get address of current leader, if any, within the data center that has the given role
*/
def roleLeader(role: String): Option[Address] = roleLeaderMap.getOrElse(role, None)
/**
* Java API: get address of current leader, if any, within the team that has the given role
* Java API: get address of current leader, if any, within the data center that has the given role
* or null if no such node exists
*/
def getRoleLeader(role: String): Address = roleLeaderMap.get(role).flatten.orNull
@ -115,15 +115,15 @@ object ClusterEvent {
scala.collection.JavaConverters.setAsJavaSetConverter(allRoles).asJava
/**
* All teams in the cluster
* All data centers in the cluster
*/
def allTeams: Set[String] = members.map(_.team)(breakOut)
def allDataCenters: Set[String] = members.map(_.dataCenter)(breakOut)
/**
* Java API: All teams in the cluster
* Java API: All data centers in the cluster
*/
def getAllTeams: java.util.Set[String] =
scala.collection.JavaConverters.setAsJavaSetConverter(allTeams).asJava
def getAllDataCenters: java.util.Set[String] =
scala.collection.JavaConverters.setAsJavaSetConverter(allDataCenters).asJava
}
@ -189,7 +189,7 @@ object ClusterEvent {
}
/**
* Leader of the cluster team of this node changed. Published when the state change
* Leader of the cluster data center of this node changed. Published when the state change
* is first seen on a node.
*/
final case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent {
@ -201,7 +201,8 @@ object ClusterEvent {
}
/**
* First member (leader) of the members within a role set (in the same team as this node, if cluster teams are used) changed.
* First member (leader) of the members within a role set (in the same data center as this node,
* if data centers are used) changed.
* Published when the state change is first seen on a node.
*/
final case class RoleLeaderChanged(role: String, leader: Option[Address]) extends ClusterDomainEvent {
@ -318,9 +319,9 @@ object ClusterEvent {
* INTERNAL API
*/
@InternalApi
private[cluster] def diffLeader(team: Team, oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[LeaderChanged] = {
val newLeader = newGossip.teamLeader(team, selfUniqueAddress)
if (newLeader != oldGossip.teamLeader(team, selfUniqueAddress)) List(LeaderChanged(newLeader.map(_.address)))
private[cluster] def diffLeader(dc: DataCenter, oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[LeaderChanged] = {
val newLeader = newGossip.dcLeader(dc, selfUniqueAddress)
if (newLeader != oldGossip.dcLeader(dc, selfUniqueAddress)) List(LeaderChanged(newLeader.map(_.address)))
else Nil
}
@ -328,11 +329,11 @@ object ClusterEvent {
* INTERNAL API
*/
@InternalApi
private[cluster] def diffRolesLeader(team: Team, oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): Set[RoleLeaderChanged] = {
private[cluster] def diffRolesLeader(dc: DataCenter, oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): Set[RoleLeaderChanged] = {
for {
role oldGossip.allRoles union newGossip.allRoles
newLeader = newGossip.roleLeader(team, role, selfUniqueAddress)
if newLeader != oldGossip.roleLeader(team, role, selfUniqueAddress)
newLeader = newGossip.roleLeader(dc, role, selfUniqueAddress)
if newLeader != oldGossip.roleLeader(dc, role, selfUniqueAddress)
} yield RoleLeaderChanged(role, newLeader.map(_.address))
}
@ -340,12 +341,12 @@ object ClusterEvent {
* INTERNAL API
*/
@InternalApi
private[cluster] def diffSeen(team: Team, oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[SeenChanged] =
private[cluster] def diffSeen(dc: DataCenter, oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[SeenChanged] =
if (newGossip eq oldGossip) Nil
else {
val newConvergence = newGossip.convergence(team, selfUniqueAddress, Set.empty)
val newConvergence = newGossip.convergence(dc, selfUniqueAddress, Set.empty)
val newSeenBy = newGossip.seenBy
if (newConvergence != oldGossip.convergence(team, selfUniqueAddress, Set.empty) || newSeenBy != oldGossip.seenBy)
if (newConvergence != oldGossip.convergence(dc, selfUniqueAddress, Set.empty) || newSeenBy != oldGossip.seenBy)
List(SeenChanged(newConvergence, newSeenBy.map(_.address)))
else Nil
}
@ -372,7 +373,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
val cluster = Cluster(context.system)
val selfUniqueAddress = cluster.selfUniqueAddress
var latestGossip: Gossip = Gossip.empty
def selfTeam = cluster.settings.Team
def selfDc = cluster.settings.DataCenter
override def preRestart(reason: Throwable, message: Option[Any]) {
// don't postStop when restarted, no children to stop
@ -407,11 +408,9 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
members = latestGossip.members,
unreachable = unreachable,
seenBy = latestGossip.seenBy.map(_.address),
leader = latestGossip.teamLeader(selfTeam, selfUniqueAddress).map(_.address),
leader = latestGossip.dcLeader(selfDc, selfUniqueAddress).map(_.address),
roleLeaderMap = latestGossip.allRoles.map(r
r latestGossip.roleLeader(selfTeam, r, selfUniqueAddress).map(_.address)
)(collection.breakOut)
)
r latestGossip.roleLeader(selfDc, r, selfUniqueAddress).map(_.address))(collection.breakOut))
receiver ! state
}
@ -446,10 +445,10 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
diffMemberEvents(oldGossip, newGossip) foreach pub
diffUnreachable(oldGossip, newGossip, selfUniqueAddress) foreach pub
diffReachable(oldGossip, newGossip, selfUniqueAddress) foreach pub
diffLeader(selfTeam, oldGossip, newGossip, selfUniqueAddress) foreach pub
diffRolesLeader(selfTeam, oldGossip, newGossip, selfUniqueAddress) foreach pub
diffLeader(selfDc, oldGossip, newGossip, selfUniqueAddress) foreach pub
diffRolesLeader(selfDc, oldGossip, newGossip, selfUniqueAddress) foreach pub
// publish internal SeenState for testing purposes
diffSeen(selfTeam, oldGossip, newGossip, selfUniqueAddress) foreach pub
diffSeen(selfDc, oldGossip, newGossip, selfUniqueAddress) foreach pub
diffReachability(oldGossip, newGossip) foreach pub
}

View file

@ -109,12 +109,12 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
def status: MemberStatus = self.status
/**
* Is this node the current team leader
* Is this node the current data center leader
*/
def isLeader: Boolean = leader.contains(selfAddress)
/**
* Get the address of the current team leader
* Get the address of the current data center leader
*/
def leader: Option[Address] = state.leader

View file

@ -18,18 +18,18 @@ import scala.concurrent.duration.FiniteDuration
import akka.japi.Util.immutableSeq
object ClusterSettings {
type Team = String
type DataCenter = String
/**
* INTERNAL API.
*/
@InternalApi
private[akka] val TeamRolePrefix = "team-"
private[akka] val DcRolePrefix = "dc-"
/**
* INTERNAL API.
*/
@InternalApi
private[akka] val DefaultTeam: Team = "default"
private[akka] val DefaultDataCenter: DataCenter = "default"
}
@ -116,14 +116,13 @@ final class ClusterSettings(val config: Config, val systemName: String) {
val AllowWeaklyUpMembers = cc.getBoolean("allow-weakly-up-members")
val Team: Team = cc.getString("team")
val DataCenter: DataCenter = cc.getString("data-center")
val Roles: Set[String] = {
val configuredRoles = (immutableSeq(cc.getStringList("roles")).toSet) requiring (
_.forall(!_.startsWith(TeamRolePrefix)),
s"Roles must not start with '${TeamRolePrefix}' as that is reserved for the cluster team setting"
)
_.forall(!_.startsWith(DcRolePrefix)),
s"Roles must not start with '${DcRolePrefix}' as that is reserved for the cluster data-center setting")
configuredRoles + s"$TeamRolePrefix$Team"
configuredRoles + s"$DcRolePrefix$DataCenter"
}
val MinNrOfMembers: Int = {
cc.getInt("min-nr-of-members")

View file

@ -5,7 +5,7 @@
package akka.cluster
import scala.collection.{ SortedSet, immutable }
import ClusterSettings.Team
import ClusterSettings.DataCenter
import MemberStatus._
import akka.annotation.InternalApi
@ -169,32 +169,31 @@ private[cluster] final case class Gossip(
}
/**
* Checks if we have a cluster convergence. If there are any in team node pairs that cannot reach each other
* Checks if we have a cluster convergence. If there are any in data center node pairs that cannot reach each other
* then we can't have a convergence until those nodes reach each other again or one of them is downed
*
* @return true if convergence have been reached and false if not
*/
def convergence(team: Team, selfUniqueAddress: UniqueAddress, exitingConfirmed: Set[UniqueAddress]): Boolean = {
// Find cluster members in the team that are unreachable from other members of the team
// excluding observations from members outside of the team, that have status DOWN or is passed in as confirmed exiting.
val unreachableInTeam = teamReachabilityExcludingDownedObservers(team).allUnreachableOrTerminated.collect {
def convergence(dc: DataCenter, selfUniqueAddress: UniqueAddress, exitingConfirmed: Set[UniqueAddress]): Boolean = {
// Find cluster members in the data center that are unreachable from other members of the data center
// excluding observations from members outside of the data center, that have status DOWN or is passed in as confirmed exiting.
val unreachableInDc = dcReachabilityExcludingDownedObservers(dc).allUnreachableOrTerminated.collect {
case node if node != selfUniqueAddress && !exitingConfirmed(node) member(node)
}
// If another member in the team that is UP or LEAVING and has not seen this gossip or is exiting
// If another member in the data center that is UP or LEAVING and has not seen this gossip or is exiting
// convergence cannot be reached
def teamMemberHinderingConvergenceExists =
def memberHinderingConvergenceExists =
members.exists(member
member.team == team &&
member.dataCenter == dc &&
Gossip.convergenceMemberStatus(member.status) &&
!(seenByNode(member.uniqueAddress) || exitingConfirmed(member.uniqueAddress))
)
!(seenByNode(member.uniqueAddress) || exitingConfirmed(member.uniqueAddress)))
// unreachables outside of the team or with status DOWN or EXITING does not affect convergence
// unreachables outside of the data center or with status DOWN or EXITING does not affect convergence
def allUnreachablesCanBeIgnored =
unreachableInTeam.forall(unreachable Gossip.convergenceSkipUnreachableWithMemberStatus(unreachable.status))
unreachableInDc.forall(unreachable Gossip.convergenceSkipUnreachableWithMemberStatus(unreachable.status))
allUnreachablesCanBeIgnored && !teamMemberHinderingConvergenceExists
allUnreachablesCanBeIgnored && !memberHinderingConvergenceExists
}
lazy val reachabilityExcludingDownedObservers: Reachability = {
@ -203,77 +202,77 @@ private[cluster] final case class Gossip(
}
/**
* @return Reachability excluding observations from nodes outside of the team, but including observed unreachable
* nodes outside of the team
* @return Reachability excluding observations from nodes outside of the data center, but including observed unreachable
* nodes outside of the data center
*/
def teamReachability(team: Team): Reachability =
overview.reachability.removeObservers(members.collect { case m if m.team != team m.uniqueAddress })
def dcReachability(dc: DataCenter): Reachability =
overview.reachability.removeObservers(members.collect { case m if m.dataCenter != dc m.uniqueAddress })
/**
* @return reachability for team nodes, with observations from outside the team or from downed nodes filtered out
* @return reachability for data center nodes, with observations from outside the data center or from downed nodes filtered out
*/
def teamReachabilityExcludingDownedObservers(team: Team): Reachability = {
val membersToExclude = members.collect { case m if m.status == Down || m.team != team m.uniqueAddress }
overview.reachability.removeObservers(membersToExclude).remove(members.collect { case m if m.team != team m.uniqueAddress })
def dcReachabilityExcludingDownedObservers(dc: DataCenter): Reachability = {
val membersToExclude = members.collect { case m if m.status == Down || m.dataCenter != dc m.uniqueAddress }
overview.reachability.removeObservers(membersToExclude).remove(members.collect { case m if m.dataCenter != dc m.uniqueAddress })
}
def teamMembers(team: Team): SortedSet[Member] =
members.filter(_.team == team)
def dcMembers(dc: DataCenter): SortedSet[Member] =
members.filter(_.dataCenter == dc)
def isTeamLeader(team: Team, node: UniqueAddress, selfUniqueAddress: UniqueAddress): Boolean =
teamLeader(team, selfUniqueAddress).contains(node)
def isDcLeader(dc: DataCenter, node: UniqueAddress, selfUniqueAddress: UniqueAddress): Boolean =
dcLeader(dc, selfUniqueAddress).contains(node)
def teamLeader(team: Team, selfUniqueAddress: UniqueAddress): Option[UniqueAddress] =
leaderOf(team, members, selfUniqueAddress)
def dcLeader(dc: DataCenter, selfUniqueAddress: UniqueAddress): Option[UniqueAddress] =
leaderOf(dc, members, selfUniqueAddress)
def roleLeader(team: Team, role: String, selfUniqueAddress: UniqueAddress): Option[UniqueAddress] =
leaderOf(team, members.filter(_.hasRole(role)), selfUniqueAddress)
def roleLeader(dc: DataCenter, role: String, selfUniqueAddress: UniqueAddress): Option[UniqueAddress] =
leaderOf(dc, members.filter(_.hasRole(role)), selfUniqueAddress)
def leaderOf(team: Team, mbrs: immutable.SortedSet[Member], selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = {
val reachability = teamReachability(team)
def leaderOf(dc: DataCenter, mbrs: immutable.SortedSet[Member], selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = {
val reachability = dcReachability(dc)
val reachableTeamMembers =
if (reachability.isAllReachable) mbrs.filter(m m.team == team && m.status != Down)
val reachableMembersInDc =
if (reachability.isAllReachable) mbrs.filter(m m.dataCenter == dc && m.status != Down)
else mbrs.filter(m
m.team == team &&
m.dataCenter == dc &&
m.status != Down &&
(reachability.isReachable(m.uniqueAddress) || m.uniqueAddress == selfUniqueAddress))
if (reachableTeamMembers.isEmpty) None
else reachableTeamMembers.find(m Gossip.leaderMemberStatus(m.status))
.orElse(Some(reachableTeamMembers.min(Member.leaderStatusOrdering)))
if (reachableMembersInDc.isEmpty) None
else reachableMembersInDc.find(m Gossip.leaderMemberStatus(m.status))
.orElse(Some(reachableMembersInDc.min(Member.leaderStatusOrdering)))
.map(_.uniqueAddress)
}
def allTeams: Set[Team] = members.map(_.team)
def allDataCenters: Set[DataCenter] = members.map(_.dataCenter)
def allRoles: Set[String] = members.flatMap(_.roles)
def isSingletonCluster: Boolean = members.size == 1
/**
* @return true if toAddress should be reachable from the fromTeam in general, within a team
* this means only caring about team-local observations, across teams it means caring
* about all observations for the toAddress.
* @return true if toAddress should be reachable from the fromDc in general, within a data center
* this means only caring about data center local observations, across data centers it
* means caring about all observations for the toAddress.
*/
def isReachableExcludingDownedObservers(fromTeam: Team, toAddress: UniqueAddress): Boolean =
def isReachableExcludingDownedObservers(fromDc: DataCenter, toAddress: UniqueAddress): Boolean =
if (!hasMember(toAddress)) false
else {
val to = member(toAddress)
// if member is in the same team, we ignore cross-team unreachability
if (fromTeam == to.team) teamReachabilityExcludingDownedObservers(fromTeam).isReachable(toAddress)
// if member is in the same data center, we ignore cross data center unreachability
if (fromDc == to.dataCenter) dcReachabilityExcludingDownedObservers(fromDc).isReachable(toAddress)
// if not it is enough that any non-downed node observed it as unreachable
else reachabilityExcludingDownedObservers.isReachable(toAddress)
}
/**
* @return true if fromAddress should be able to reach toAddress based on the unreachability data and their
* respective teams
* respective data centers
*/
def isReachable(fromAddress: UniqueAddress, toAddress: UniqueAddress): Boolean =
if (!hasMember(toAddress)) false
else {
// as it looks for specific unreachable entires for the node pair we don't have to filter on team
// as it looks for specific unreachable entires for the node pair we don't have to filter on data center
overview.reachability.isReachable(fromAddress, toAddress)
}

View file

@ -7,7 +7,7 @@ package akka.cluster
import akka.actor.Address
import MemberStatus._
import akka.annotation.InternalApi
import akka.cluster.ClusterSettings.Team
import akka.cluster.ClusterSettings.DataCenter
import scala.runtime.AbstractFunction2
@ -24,9 +24,9 @@ class Member private[cluster] (
val status: MemberStatus,
val roles: Set[String]) extends Serializable {
lazy val team: String = roles.find(_.startsWith(ClusterSettings.TeamRolePrefix))
.getOrElse(throw new IllegalStateException("Team undefined, should not be possible"))
.substring(ClusterSettings.TeamRolePrefix.length)
lazy val dataCenter: DataCenter = roles.find(_.startsWith(ClusterSettings.DcRolePrefix))
.getOrElse(throw new IllegalStateException("DataCenter undefined, should not be possible"))
.substring(ClusterSettings.DcRolePrefix.length)
def address: Address = uniqueAddress.address
@ -36,10 +36,10 @@ class Member private[cluster] (
case _ false
}
override def toString =
if (team == ClusterSettings.DefaultTeam)
if (dataCenter == ClusterSettings.DefaultDataCenter)
s"Member(address = $address, status = $status)"
else
s"Member(address = $address, team = $team, status = $status)"
s"Member(address = $address, dataCenter = $dataCenter, status = $status)"
def hasRole(role: String): Boolean = roles.contains(role)
@ -54,8 +54,8 @@ class Member private[cluster] (
* member. It is only correct when comparing two existing members in a
* cluster. A member that joined after removal of another member may be
* considered older than the removed member. Note that is only makes
* sense to compare with other members inside of one team (upNumber has
* a higher risk of being reused across teams).
* sense to compare with other members inside of one data center (upNumber has
* a higher risk of being reused across data centers).
*/
def isOlderThan(other: Member): Boolean =
if (upNumber == other.upNumber)
@ -97,7 +97,7 @@ object Member {
* INTERNAL API
*/
private[cluster] def removed(node: UniqueAddress): Member =
new Member(node, Int.MaxValue, Removed, Set(ClusterSettings.TeamRolePrefix + "-N/A"))
new Member(node, Int.MaxValue, Removed, Set(ClusterSettings.DcRolePrefix + "-N/A"))
/**
* `Address` ordering type class, sorts addresses by host and port.

View file

@ -356,11 +356,11 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
roleIndex roleIndexes
role = roleMapping(roleIndex)
} {
if (role.startsWith(ClusterSettings.TeamRolePrefix)) containsDc = true
if (role.startsWith(ClusterSettings.DcRolePrefix)) containsDc = true
roles += role
}
if (!containsDc) roles + (ClusterSettings.TeamRolePrefix + "default")
if (!containsDc) roles + (ClusterSettings.DcRolePrefix + "default")
else roles
}

View file

@ -120,7 +120,7 @@ abstract class MBeanSpec
| {
| "address": "${sortedNodes(0)}",
| "roles": [
| "team-default",
| "dc-default",
| "testNode"
| ],
| "status": "Up"
@ -128,7 +128,7 @@ abstract class MBeanSpec
| {
| "address": "${sortedNodes(1)}",
| "roles": [
| "team-default",
| "dc-default",
| "testNode"
| ],
| "status": "Up"
@ -136,7 +136,7 @@ abstract class MBeanSpec
| {
| "address": "${sortedNodes(2)}",
| "roles": [
| "team-default",
| "dc-default",
| "testNode"
| ],
| "status": "Up"
@ -144,7 +144,7 @@ abstract class MBeanSpec
| {
| "address": "${sortedNodes(3)}",
| "roles": [
| "team-default",
| "dc-default",
| "testNode"
| ],
| "status": "Up"

View file

@ -10,7 +10,7 @@ import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
object MultiTeamMultiJvmSpec extends MultiNodeConfig {
object MultiDcMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
@ -21,32 +21,32 @@ object MultiTeamMultiJvmSpec extends MultiNodeConfig {
nodeConfig(first, second)(ConfigFactory.parseString(
"""
akka.cluster.team = "dc1"
akka.cluster.data-center = "dc1"
akka.loglevel = INFO
"""))
nodeConfig(third, fourth, fifth)(ConfigFactory.parseString(
"""
akka.cluster.team = "dc2"
akka.cluster.data-center = "dc2"
akka.loglevel = INFO
"""))
testTransport(on = true)
}
class MultiTeamMultiJvmNode1 extends MultiTeamSpec
class MultiTeamMultiJvmNode2 extends MultiTeamSpec
class MultiTeamMultiJvmNode3 extends MultiTeamSpec
class MultiTeamMultiJvmNode4 extends MultiTeamSpec
class MultiTeamMultiJvmNode5 extends MultiTeamSpec
class MultiDcMultiJvmNode1 extends MultiDcSpec
class MultiDcMultiJvmNode2 extends MultiDcSpec
class MultiDcMultiJvmNode3 extends MultiDcSpec
class MultiDcMultiJvmNode4 extends MultiDcSpec
class MultiDcMultiJvmNode5 extends MultiDcSpec
abstract class MultiTeamSpec
extends MultiNodeSpec(MultiTeamMultiJvmSpec)
abstract class MultiDcSpec
extends MultiNodeSpec(MultiDcMultiJvmSpec)
with MultiNodeClusterSpec {
import MultiTeamMultiJvmSpec._
import MultiDcMultiJvmSpec._
"A cluster with multiple cluster teams" must {
"A cluster with multiple data centers" must {
"be able to form" in {
runOn(first) {
@ -66,31 +66,31 @@ abstract class MultiTeamSpec
enterBarrier("cluster started")
}
"have a leader per team" in {
"have a leader per data center" in {
runOn(first, second) {
cluster.settings.Team should ===("dc1")
cluster.settings.DataCenter should ===("dc1")
clusterView.leader shouldBe defined
val dc1 = Set(address(first), address(second))
dc1 should contain(clusterView.leader.get)
}
runOn(third, fourth) {
cluster.settings.Team should ===("dc2")
cluster.settings.DataCenter should ===("dc2")
clusterView.leader shouldBe defined
val dc2 = Set(address(third), address(fourth))
dc2 should contain(clusterView.leader.get)
}
enterBarrier("leader per team")
enterBarrier("leader per data center")
}
"be able to have team member changes while there is inter-team unreachability" in within(20.seconds) {
"be able to have data center member changes while there is inter data center unreachability" in within(20.seconds) {
runOn(first) {
testConductor.blackhole(first, third, Direction.Both).await
}
runOn(first, second, third, fourth) {
awaitAssert(clusterView.unreachableMembers should not be empty)
}
enterBarrier("inter-team unreachability")
enterBarrier("inter-data-center unreachability")
runOn(fifth) {
cluster.join(third)
@ -108,17 +108,17 @@ abstract class MultiTeamSpec
runOn(first, second, third, fourth) {
awaitAssert(clusterView.unreachableMembers should not be empty)
}
enterBarrier("inter-team unreachability end")
enterBarrier("inter-data-center unreachability end")
}
"be able to have team member changes while there is unreachability in another team" in within(20.seconds) {
"be able to have data center member changes while there is unreachability in another data center" in within(20.seconds) {
runOn(first) {
testConductor.blackhole(first, second, Direction.Both).await
}
runOn(first, second, third, fourth) {
awaitAssert(clusterView.unreachableMembers should not be empty)
}
enterBarrier("other-team-internal-unreachable")
enterBarrier("other-data-center-internal-unreachable")
runOn(third) {
cluster.join(fifth)
@ -130,15 +130,15 @@ abstract class MultiTeamSpec
awaitAssert(clusterView.members.collect { case m if m.status == Up m.address } should contain(address(fifth)))
}
enterBarrier("other-team-internal-unreachable changed")
enterBarrier("other-data-center-internal-unreachable changed")
runOn(first) {
testConductor.passThrough(first, second, Direction.Both).await
}
enterBarrier("other-team-internal-unreachable end")
enterBarrier("other-datac-enter-internal-unreachable end")
}
"be able to down a member of another team" in within(20.seconds) {
"be able to down a member of another data-center" in within(20.seconds) {
runOn(fifth) {
cluster.down(address(second))
}
@ -146,7 +146,7 @@ abstract class MultiTeamSpec
runOn(first, third, fifth) {
awaitAssert(clusterView.members.map(_.address) should not contain (address(second)))
}
enterBarrier("cross-team-downed")
enterBarrier("cross-data-center-downed")
}
}

View file

@ -9,7 +9,7 @@ import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
object MultiTeamSplitBrainMultiJvmSpec extends MultiNodeConfig {
object MultiDcSplitBrainMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
@ -19,35 +19,35 @@ object MultiTeamSplitBrainMultiJvmSpec extends MultiNodeConfig {
nodeConfig(first, second)(ConfigFactory.parseString(
"""
akka.cluster.team = "dc1"
akka.cluster.data-center = "dc1"
akka.loglevel = INFO
"""))
nodeConfig(third, fourth)(ConfigFactory.parseString(
"""
akka.cluster.team = "dc2"
akka.cluster.data-center = "dc2"
akka.loglevel = INFO
"""))
testTransport(on = true)
}
class MultiTeamSplitBrainMultiJvmNode1 extends MultiTeamSpec
class MultiTeamSplitBrainMultiJvmNode2 extends MultiTeamSpec
class MultiTeamSplitBrainMultiJvmNode3 extends MultiTeamSpec
class MultiTeamSplitBrainMultiJvmNode4 extends MultiTeamSpec
class MultiTeamSplitBrainMultiJvmNode5 extends MultiTeamSpec
class MultiDcSplitBrainMultiJvmNode1 extends MultiDcSpec
class MultiDcSplitBrainMultiJvmNode2 extends MultiDcSpec
class MultiDcSplitBrainMultiJvmNode3 extends MultiDcSpec
class MultiDcSplitBrainMultiJvmNode4 extends MultiDcSpec
class MultiDcSplitBrainMultiJvmNode5 extends MultiDcSpec
abstract class MultiTeamSplitBrainSpec
extends MultiNodeSpec(MultiTeamSplitBrainMultiJvmSpec)
abstract class MultiDcSplitBrainSpec
extends MultiNodeSpec(MultiDcSplitBrainMultiJvmSpec)
with MultiNodeClusterSpec {
import MultiTeamSplitBrainMultiJvmSpec._
import MultiDcSplitBrainMultiJvmSpec._
val dc1 = List(first, second)
val dc2 = List(third, fourth)
def splitTeams(): Unit = {
def splitDataCenters(): Unit = {
runOn(first) {
for {
dc1Node dc1
@ -66,7 +66,7 @@ abstract class MultiTeamSplitBrainSpec
}
def unsplitTeams(): Unit = {
def unsplitDataCenters(): Unit = {
runOn(first) {
for {
dc1Node dc1
@ -79,45 +79,45 @@ abstract class MultiTeamSplitBrainSpec
awaitAllReachable()
}
"A cluster with multiple cluster teams" must {
"be able to form two teams" in {
"A cluster with multiple data centers" must {
"be able to form two data centers" in {
awaitClusterUp(first, second, third)
}
"be able to have a team member join while there is inter-team split" in within(20.seconds) {
// introduce a split between teams
splitTeams()
enterBarrier("team-split-1")
"be able to have a data center member join while there is inter data center split" in within(20.seconds) {
// introduce a split between data centers
splitDataCenters()
enterBarrier("data-center-split-1")
runOn(fourth) {
cluster.join(third)
}
enterBarrier("inter-team unreachability")
enterBarrier("inter-data-center unreachability")
// should be able to join and become up since the
// split is between dc1 and dc2
runOn(third, fourth) {
awaitAssert(clusterView.members.collect {
case m if m.team == "dc2" && m.status == MemberStatus.Up m.address
case m if m.dataCenter == "dc2" && m.status == MemberStatus.Up m.address
}) should ===(Set(address(third), address(fourth)))
}
enterBarrier("dc2-join-completed")
unsplitTeams()
enterBarrier("team-unsplit-1")
unsplitDataCenters()
enterBarrier("data-center-unsplit-1")
runOn(dc1: _*) {
awaitAssert(clusterView.members.collect {
case m if m.team == "dc2" && m.status == MemberStatus.Up m.address
case m if m.dataCenter == "dc2" && m.status == MemberStatus.Up m.address
}) should ===(Set(address(third), address(fourth)))
}
enterBarrier("inter-team-split-1-done")
enterBarrier("inter-data-center-split-1-done")
}
"be able to have team member leave while there is inter-team split" in within(20.seconds) {
splitTeams()
enterBarrier("team-split-2")
"be able to have data center member leave while there is inter data center split" in within(20.seconds) {
splitDataCenters()
enterBarrier("data-center-split-2")
runOn(fourth) {
cluster.leave(third)
@ -128,13 +128,13 @@ abstract class MultiTeamSplitBrainSpec
}
enterBarrier("node-4-left")
unsplitTeams()
enterBarrier("team-unsplit-2")
unsplitDataCenters()
enterBarrier("data-center-unsplit-2")
runOn(first, second) {
awaitAssert(clusterView.members.filter(_.address == address(fourth)) should ===(Set.empty))
}
enterBarrier("inter-team-split-2-done")
enterBarrier("inter-data-center-split-2-done")
}
}

View file

@ -91,7 +91,7 @@ abstract class QuickRestartSpec
Cluster(system).state.members.size should ===(totalNumberOfNodes)
Cluster(system).state.members.map(_.status == MemberStatus.Up)
// use the role to test that it is the new incarnation that joined, sneaky
Cluster(system).state.members.flatMap(_.roles) should ===(Set(s"round-$n", ClusterSettings.TeamRolePrefix + "default"))
Cluster(system).state.members.flatMap(_.roles) should ===(Set(s"round-$n", ClusterSettings.DcRolePrefix + "default"))
}
}
enterBarrier("members-up-" + n)

View file

@ -45,8 +45,8 @@ class ClusterConfigSpec extends AkkaSpec {
DownRemovalMargin should ===(Duration.Zero)
MinNrOfMembers should ===(1)
MinNrOfMembersOfRole should ===(Map.empty[String, Int])
Team should ===("default")
Roles should ===(Set(ClusterSettings.TeamRolePrefix + "default"))
DataCenter should ===("default")
Roles should ===(Set(ClusterSettings.DcRolePrefix + "default"))
JmxEnabled should ===(true)
UseDispatcher should ===(Dispatchers.DefaultDispatcherId)
GossipDifferentViewProbability should ===(0.8 +- 0.0001)
@ -61,13 +61,13 @@ class ClusterConfigSpec extends AkkaSpec {
|akka {
| cluster {
| roles = [ "hamlet" ]
| team = "blue"
| data-center = "blue"
| }
|}
""".stripMargin).withFallback(ConfigFactory.load()), system.name)
import settings._
Roles should ===(Set("hamlet", ClusterSettings.TeamRolePrefix + "blue"))
Team should ===("blue")
Roles should ===(Set("hamlet", ClusterSettings.DcRolePrefix + "blue"))
DataCenter should ===("blue")
}
}
}

View file

@ -133,7 +133,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish
publisher ! PublishChanges(Gossip(members = SortedSet(cJoining, dUp)))
subscriber.expectMsgAllOf(
RoleLeaderChanged("GRP", Some(dUp.address)),
RoleLeaderChanged(ClusterSettings.TeamRolePrefix + ClusterSettings.DefaultTeam, Some(dUp.address))
RoleLeaderChanged(ClusterSettings.DcRolePrefix + ClusterSettings.DefaultDataCenter, Some(dUp.address))
)
publisher ! PublishChanges(Gossip(members = SortedSet(cUp, dUp)))
subscriber.expectMsg(RoleLeaderChanged("GRP", Some(cUp.address)))

View file

@ -52,7 +52,7 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
diffMemberEvents(g1, g2) should ===(Seq(MemberUp(bUp), MemberJoined(eJoining)))
diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty)
diffSeen(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
diffSeen(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
}
"be produced for changed status of members" in {
@ -61,7 +61,7 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
diffMemberEvents(g1, g2) should ===(Seq(MemberUp(aUp), MemberLeft(cLeaving), MemberJoined(eJoining)))
diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty)
diffSeen(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
diffSeen(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
}
"be produced for members in unreachable" in {
@ -76,7 +76,7 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq(UnreachableMember(bDown)))
// never include self member in unreachable
diffUnreachable(g1, g2, bDown.uniqueAddress) should ===(Seq())
diffSeen(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(Seq.empty)
diffSeen(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(Seq.empty)
}
"be produced for members becoming reachable after unreachable" in {
@ -104,7 +104,7 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
diffMemberEvents(g1, g2) should ===(Seq(MemberRemoved(dRemoved, Exiting)))
diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty)
diffSeen(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
diffSeen(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
}
"be produced for convergence changes" in {
@ -113,10 +113,10 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
diffMemberEvents(g1, g2) should ===(Seq.empty)
diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty)
diffSeen(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address))))
diffSeen(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address))))
diffMemberEvents(g2, g1) should ===(Seq.empty)
diffUnreachable(g2, g1, selfDummyAddress) should ===(Seq.empty)
diffSeen(ClusterSettings.DefaultTeam, g2, g1, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address, eJoining.address))))
diffSeen(ClusterSettings.DefaultDataCenter, g2, g1, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address, eJoining.address))))
}
"be produced for leader changes" in {
@ -125,33 +125,33 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
diffMemberEvents(g1, g2) should ===(Seq(MemberRemoved(aRemoved, Up)))
diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty)
diffSeen(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
diffLeader(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(Seq(LeaderChanged(Some(bUp.address))))
diffSeen(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
diffLeader(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(Seq(LeaderChanged(Some(bUp.address))))
}
"be produced for role leader changes in the same team" in {
"be produced for role leader changes in the same data center" in {
val g0 = Gossip.empty
val g1 = Gossip(members = SortedSet(aUp, bUp, cUp, dLeaving, eJoining))
val g2 = Gossip(members = SortedSet(bUp, cUp, dExiting, eJoining))
diffRolesLeader(ClusterSettings.DefaultTeam, g0, g1, selfDummyAddress) should ===(
diffRolesLeader(ClusterSettings.DefaultDataCenter, g0, g1, selfDummyAddress) should ===(
Set(
// since this role is implicitly added
RoleLeaderChanged(ClusterSettings.TeamRolePrefix + ClusterSettings.DefaultTeam, Some(aUp.address)),
RoleLeaderChanged(ClusterSettings.DcRolePrefix + ClusterSettings.DefaultDataCenter, Some(aUp.address)),
RoleLeaderChanged("AA", Some(aUp.address)),
RoleLeaderChanged("AB", Some(aUp.address)),
RoleLeaderChanged("BB", Some(bUp.address)),
RoleLeaderChanged("DD", Some(dLeaving.address)),
RoleLeaderChanged("DE", Some(dLeaving.address)),
RoleLeaderChanged("EE", Some(eUp.address))))
diffRolesLeader(ClusterSettings.DefaultTeam, g1, g2, selfDummyAddress) should ===(
diffRolesLeader(ClusterSettings.DefaultDataCenter, g1, g2, selfDummyAddress) should ===(
Set(
RoleLeaderChanged(ClusterSettings.TeamRolePrefix + ClusterSettings.DefaultTeam, Some(bUp.address)),
RoleLeaderChanged(ClusterSettings.DcRolePrefix + ClusterSettings.DefaultDataCenter, Some(bUp.address)),
RoleLeaderChanged("AA", None),
RoleLeaderChanged("AB", Some(bUp.address)),
RoleLeaderChanged("DE", Some(eJoining.address))))
}
"not be produced for role leader changes in other teams" in {
"not be produced for role leader changes in other data centers" in {
val g0 = Gossip.empty
val g1 = Gossip(members = SortedSet(aUp, bUp, cUp, dLeaving, eJoining))
val g2 = Gossip(members = SortedSet(bUp, cUp, dExiting, eJoining))

View file

@ -7,7 +7,7 @@ package akka.cluster
import org.scalatest.WordSpec
import org.scalatest.Matchers
import akka.actor.Address
import akka.cluster.ClusterSettings.DefaultTeam
import akka.cluster.ClusterSettings.DefaultDataCenter
import scala.collection.immutable.SortedSet
@ -27,55 +27,54 @@ class GossipSpec extends WordSpec with Matchers {
val e2 = TestMember(e1.address, Up)
val e3 = TestMember(e1.address, Down)
val dc1a1 = TestMember(Address("akka.tcp", "sys", "a", 2552), Up, Set.empty, team = "dc1")
val dc1b1 = TestMember(Address("akka.tcp", "sys", "b", 2552), Up, Set.empty, team = "dc1")
val dc2c1 = TestMember(Address("akka.tcp", "sys", "c", 2552), Up, Set.empty, team = "dc2")
val dc2d1 = TestMember(Address("akka.tcp", "sys", "d", 2552), Up, Set.empty, team = "dc2")
val dc2d2 = TestMember(dc2d1.address, status = Down, roles = Set.empty, team = dc2d1.team)
val dc1a1 = TestMember(Address("akka.tcp", "sys", "a", 2552), Up, Set.empty, dataCenter = "dc1")
val dc1b1 = TestMember(Address("akka.tcp", "sys", "b", 2552), Up, Set.empty, dataCenter = "dc1")
val dc2c1 = TestMember(Address("akka.tcp", "sys", "c", 2552), Up, Set.empty, dataCenter = "dc2")
val dc2d1 = TestMember(Address("akka.tcp", "sys", "d", 2552), Up, Set.empty, dataCenter = "dc2")
val dc2d2 = TestMember(dc2d1.address, status = Down, roles = Set.empty, dataCenter = dc2d1.dataCenter)
"A Gossip" must {
"have correct test setup" in {
List(a1, a2, b1, b2, c1, c2, c3, d1, e1, e2, e3).foreach(m
m.team should ===(DefaultTeam)
)
m.dataCenter should ===(DefaultDataCenter))
}
"reach convergence when it's empty" in {
Gossip.empty.convergence(DefaultTeam, a1.uniqueAddress, Set.empty) should ===(true)
Gossip.empty.convergence(DefaultDataCenter, a1.uniqueAddress, Set.empty) should ===(true)
}
"reach convergence for one node" in {
val g1 = Gossip(members = SortedSet(a1)).seen(a1.uniqueAddress)
g1.convergence(DefaultTeam, a1.uniqueAddress, Set.empty) should ===(true)
g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set.empty) should ===(true)
}
"not reach convergence until all have seen version" in {
val g1 = Gossip(members = SortedSet(a1, b1)).seen(a1.uniqueAddress)
g1.convergence(DefaultTeam, a1.uniqueAddress, Set.empty) should ===(false)
g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set.empty) should ===(false)
}
"reach convergence for two nodes" in {
val g1 = Gossip(members = SortedSet(a1, b1)).seen(a1.uniqueAddress).seen(b1.uniqueAddress)
g1.convergence(DefaultTeam, a1.uniqueAddress, Set.empty) should ===(true)
g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set.empty) should ===(true)
}
"reach convergence, skipping joining" in {
// e1 is joining
val g1 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.uniqueAddress).seen(b1.uniqueAddress)
g1.convergence(DefaultTeam, a1.uniqueAddress, Set.empty) should ===(true)
g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set.empty) should ===(true)
}
"reach convergence, skipping down" in {
// e3 is down
val g1 = Gossip(members = SortedSet(a1, b1, e3)).seen(a1.uniqueAddress).seen(b1.uniqueAddress)
g1.convergence(DefaultTeam, a1.uniqueAddress, Set.empty) should ===(true)
g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set.empty) should ===(true)
}
"reach convergence, skipping Leaving with exitingConfirmed" in {
// c1 is Leaving
val g1 = Gossip(members = SortedSet(a1, b1, c1)).seen(a1.uniqueAddress).seen(b1.uniqueAddress)
g1.convergence(DefaultTeam, a1.uniqueAddress, Set(c1.uniqueAddress)) should ===(true)
g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set(c1.uniqueAddress)) should ===(true)
}
"reach convergence, skipping unreachable Leaving with exitingConfirmed" in {
@ -83,16 +82,16 @@ class GossipSpec extends WordSpec with Matchers {
val r1 = Reachability.empty.unreachable(b1.uniqueAddress, c1.uniqueAddress)
val g1 = Gossip(members = SortedSet(a1, b1, c1), overview = GossipOverview(reachability = r1))
.seen(a1.uniqueAddress).seen(b1.uniqueAddress)
g1.convergence(DefaultTeam, a1.uniqueAddress, Set(c1.uniqueAddress)) should ===(true)
g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set(c1.uniqueAddress)) should ===(true)
}
"not reach convergence when unreachable" in {
val r1 = Reachability.empty.unreachable(b1.uniqueAddress, a1.uniqueAddress)
val g1 = (Gossip(members = SortedSet(a1, b1), overview = GossipOverview(reachability = r1)))
.seen(a1.uniqueAddress).seen(b1.uniqueAddress)
g1.convergence(DefaultTeam, b1.uniqueAddress, Set.empty) should ===(false)
g1.convergence(DefaultDataCenter, b1.uniqueAddress, Set.empty) should ===(false)
// but from a1's point of view (it knows that itself is not unreachable)
g1.convergence(DefaultTeam, a1.uniqueAddress, Set.empty) should ===(true)
g1.convergence(DefaultDataCenter, a1.uniqueAddress, Set.empty) should ===(true)
}
"reach convergence when downed node has observed unreachable" in {
@ -100,7 +99,7 @@ class GossipSpec extends WordSpec with Matchers {
val r1 = Reachability.empty.unreachable(e3.uniqueAddress, a1.uniqueAddress)
val g1 = (Gossip(members = SortedSet(a1, b1, e3), overview = GossipOverview(reachability = r1)))
.seen(a1.uniqueAddress).seen(b1.uniqueAddress).seen(e3.uniqueAddress)
g1.convergence(DefaultTeam, b1.uniqueAddress, Set.empty) should ===(true)
g1.convergence(DefaultDataCenter, b1.uniqueAddress, Set.empty) should ===(true)
}
"merge members by status priority" in {
@ -147,37 +146,37 @@ class GossipSpec extends WordSpec with Matchers {
}
"have leader as first member based on ordering, except Exiting status" in {
Gossip(members = SortedSet(c2, e2)).teamLeader(DefaultTeam, c2.uniqueAddress) should ===(Some(c2.uniqueAddress))
Gossip(members = SortedSet(c3, e2)).teamLeader(DefaultTeam, c3.uniqueAddress) should ===(Some(e2.uniqueAddress))
Gossip(members = SortedSet(c3)).teamLeader(DefaultTeam, c3.uniqueAddress) should ===(Some(c3.uniqueAddress))
Gossip(members = SortedSet(c2, e2)).dcLeader(DefaultDataCenter, c2.uniqueAddress) should ===(Some(c2.uniqueAddress))
Gossip(members = SortedSet(c3, e2)).dcLeader(DefaultDataCenter, c3.uniqueAddress) should ===(Some(e2.uniqueAddress))
Gossip(members = SortedSet(c3)).dcLeader(DefaultDataCenter, c3.uniqueAddress) should ===(Some(c3.uniqueAddress))
}
"have leader as first reachable member based on ordering" in {
val r1 = Reachability.empty.unreachable(e2.uniqueAddress, c2.uniqueAddress)
val g1 = Gossip(members = SortedSet(c2, e2), overview = GossipOverview(reachability = r1))
g1.teamLeader(DefaultTeam, e2.uniqueAddress) should ===(Some(e2.uniqueAddress))
g1.dcLeader(DefaultDataCenter, e2.uniqueAddress) should ===(Some(e2.uniqueAddress))
// but when c2 is selfUniqueAddress
g1.teamLeader(DefaultTeam, c2.uniqueAddress) should ===(Some(c2.uniqueAddress))
g1.dcLeader(DefaultDataCenter, c2.uniqueAddress) should ===(Some(c2.uniqueAddress))
}
"not have Down member as leader" in {
Gossip(members = SortedSet(e3)).teamLeader(DefaultTeam, e3.uniqueAddress) should ===(None)
Gossip(members = SortedSet(e3)).dcLeader(DefaultDataCenter, e3.uniqueAddress) should ===(None)
}
"have a leader per team" in {
"have a leader per data center" in {
val g1 = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1))
// everybodys point of view is dc1a1 being leader of dc1
g1.teamLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
g1.teamLeader("dc1", dc1b1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
g1.teamLeader("dc1", dc2c1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
g1.teamLeader("dc1", dc2d1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
g1.dcLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
g1.dcLeader("dc1", dc1b1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
g1.dcLeader("dc1", dc2c1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
g1.dcLeader("dc1", dc2d1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
// and dc2c1 being leader of dc2
g1.teamLeader("dc2", dc1a1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
g1.teamLeader("dc2", dc1b1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
g1.teamLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
g1.teamLeader("dc2", dc2d1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
g1.dcLeader("dc2", dc1a1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
g1.dcLeader("dc2", dc1b1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
g1.dcLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
g1.dcLeader("dc2", dc2d1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
}
"merge seen table correctly" in {
@ -213,20 +212,20 @@ class GossipSpec extends WordSpec with Matchers {
g3.youngestMember should ===(e2)
}
"reach convergence per team" in {
"reach convergence per data center" in {
val g = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1))
.seen(dc1a1.uniqueAddress)
.seen(dc1b1.uniqueAddress)
.seen(dc2c1.uniqueAddress)
.seen(dc2d1.uniqueAddress)
g.teamLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
g.dcLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
g.convergence("dc1", dc1a1.uniqueAddress, Set.empty) should ===(true)
g.teamLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
g.dcLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
g.convergence("dc2", dc2c1.uniqueAddress, Set.empty) should ===(true)
}
"reach convergence per team even if members of another team has not seen the gossip" in {
"reach convergence per data center even if members of another data center has not seen the gossip" in {
val g = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1))
.seen(dc1a1.uniqueAddress)
.seen(dc1b1.uniqueAddress)
@ -234,15 +233,15 @@ class GossipSpec extends WordSpec with Matchers {
// dc2d1 has not seen the gossip
// so dc1 can reach convergence
g.teamLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
g.dcLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
g.convergence("dc1", dc1a1.uniqueAddress, Set.empty) should ===(true)
// but dc2 cannot
g.teamLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
g.dcLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
g.convergence("dc2", dc2c1.uniqueAddress, Set.empty) should ===(false)
}
"reach convergence per team even if another team contains unreachable" in {
"reach convergence per data center even if another data center contains unreachable" in {
val r1 = Reachability.empty.unreachable(dc2c1.uniqueAddress, dc2d1.uniqueAddress)
val g = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1), overview = GossipOverview(reachability = r1))
@ -251,16 +250,16 @@ class GossipSpec extends WordSpec with Matchers {
.seen(dc2c1.uniqueAddress)
.seen(dc2d1.uniqueAddress)
// this team doesn't care about dc2 having reachability problems and can reach convergence
g.teamLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
// this data center doesn't care about dc2 having reachability problems and can reach convergence
g.dcLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
g.convergence("dc1", dc1a1.uniqueAddress, Set.empty) should ===(true)
// this team is cannot reach convergence because of unreachability within the team
g.teamLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
// this data center is cannot reach convergence because of unreachability within the data center
g.dcLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
g.convergence("dc2", dc2c1.uniqueAddress, Set.empty) should ===(false)
}
"reach convergence per team even if there is unreachable nodes in another team" in {
"reach convergence per data center even if there is unreachable nodes in another data center" in {
val r1 = Reachability.empty
.unreachable(dc1a1.uniqueAddress, dc2d1.uniqueAddress)
.unreachable(dc2d1.uniqueAddress, dc1a1.uniqueAddress)
@ -271,33 +270,33 @@ class GossipSpec extends WordSpec with Matchers {
.seen(dc2c1.uniqueAddress)
.seen(dc2d1.uniqueAddress)
// neither team is affected by the inter-team unreachability as far as convergence goes
g.teamLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
// neither data center is affected by the inter data center unreachability as far as convergence goes
g.dcLeader("dc1", dc1a1.uniqueAddress) should ===(Some(dc1a1.uniqueAddress))
g.convergence("dc1", dc1a1.uniqueAddress, Set.empty) should ===(true)
g.teamLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
g.dcLeader("dc2", dc2c1.uniqueAddress) should ===(Some(dc2c1.uniqueAddress))
g.convergence("dc2", dc2c1.uniqueAddress, Set.empty) should ===(true)
}
"ignore cross team unreachability when determining inside of team reachability" in {
"ignore cross data center unreachability when determining inside of data center reachability" in {
val r1 = Reachability.empty
.unreachable(dc1a1.uniqueAddress, dc2c1.uniqueAddress)
.unreachable(dc2c1.uniqueAddress, dc1a1.uniqueAddress)
val g = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1), overview = GossipOverview(reachability = r1))
// inside of the teams we don't care about the cross team unreachability
// inside of the data center we don't care about the cross data center unreachability
g.isReachable(dc1a1.uniqueAddress, dc1b1.uniqueAddress) should ===(true)
g.isReachable(dc1b1.uniqueAddress, dc1a1.uniqueAddress) should ===(true)
g.isReachable(dc2c1.uniqueAddress, dc2d1.uniqueAddress) should ===(true)
g.isReachable(dc2d1.uniqueAddress, dc2c1.uniqueAddress) should ===(true)
g.isReachableExcludingDownedObservers(dc1a1.team, dc1b1.uniqueAddress) should ===(true)
g.isReachableExcludingDownedObservers(dc1b1.team, dc1a1.uniqueAddress) should ===(true)
g.isReachableExcludingDownedObservers(dc2c1.team, dc2d1.uniqueAddress) should ===(true)
g.isReachableExcludingDownedObservers(dc2d1.team, dc2c1.uniqueAddress) should ===(true)
g.isReachableExcludingDownedObservers(dc1a1.dataCenter, dc1b1.uniqueAddress) should ===(true)
g.isReachableExcludingDownedObservers(dc1b1.dataCenter, dc1a1.uniqueAddress) should ===(true)
g.isReachableExcludingDownedObservers(dc2c1.dataCenter, dc2d1.uniqueAddress) should ===(true)
g.isReachableExcludingDownedObservers(dc2d1.dataCenter, dc2c1.uniqueAddress) should ===(true)
// between teams it matters though
// between data centers it matters though
g.isReachable(dc1a1.uniqueAddress, dc2c1.uniqueAddress) should ===(false)
g.isReachable(dc2c1.uniqueAddress, dc1a1.uniqueAddress) should ===(false)
// this isReachable method only says false for specific unreachable entries between the nodes
@ -305,25 +304,25 @@ class GossipSpec extends WordSpec with Matchers {
g.isReachable(dc2d1.uniqueAddress, dc1a1.uniqueAddress) should ===(true)
// this one looks at all unreachable-entries for the to-address
g.isReachableExcludingDownedObservers(dc1a1.team, dc2c1.uniqueAddress) should ===(false)
g.isReachableExcludingDownedObservers(dc1b1.team, dc2c1.uniqueAddress) should ===(false)
g.isReachableExcludingDownedObservers(dc2c1.team, dc1a1.uniqueAddress) should ===(false)
g.isReachableExcludingDownedObservers(dc2d1.team, dc1a1.uniqueAddress) should ===(false)
g.isReachableExcludingDownedObservers(dc1a1.dataCenter, dc2c1.uniqueAddress) should ===(false)
g.isReachableExcludingDownedObservers(dc1b1.dataCenter, dc2c1.uniqueAddress) should ===(false)
g.isReachableExcludingDownedObservers(dc2c1.dataCenter, dc1a1.uniqueAddress) should ===(false)
g.isReachableExcludingDownedObservers(dc2d1.dataCenter, dc1a1.uniqueAddress) should ===(false)
// between the two other nodes there is no unreachability
g.isReachable(dc1b1.uniqueAddress, dc2d1.uniqueAddress) should ===(true)
g.isReachable(dc2d1.uniqueAddress, dc1b1.uniqueAddress) should ===(true)
g.isReachableExcludingDownedObservers(dc1b1.team, dc2d1.uniqueAddress) should ===(true)
g.isReachableExcludingDownedObservers(dc2d1.team, dc1b1.uniqueAddress) should ===(true)
g.isReachableExcludingDownedObservers(dc1b1.dataCenter, dc2d1.uniqueAddress) should ===(true)
g.isReachableExcludingDownedObservers(dc2d1.dataCenter, dc1b1.uniqueAddress) should ===(true)
}
"not returning a downed team leader" in {
"not returning a downed data center leader" in {
val g = Gossip(members = SortedSet(dc1a1.copy(Down), dc1b1))
g.leaderOf("dc1", g.members, dc1b1.uniqueAddress) should ===(Some(dc1b1.uniqueAddress))
}
"ignore cross team unreachability when determining team leader" in {
"ignore cross data center unreachability when determining data center leader" in {
val r1 = Reachability.empty
.unreachable(dc1a1.uniqueAddress, dc2d1.uniqueAddress)
.unreachable(dc2d1.uniqueAddress, dc1a1.uniqueAddress)
@ -356,7 +355,7 @@ class GossipSpec extends WordSpec with Matchers {
g.members.toList should ===(List(dc1a1, dc2d2))
}
"not reintroduce members from out-of-team gossip when merging" in {
"not reintroduce members from out-of data center gossip when merging" in {
// dc1 does not know about any unreachability nor that the node has been downed
val gdc1 = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1))
@ -408,7 +407,7 @@ class GossipSpec extends WordSpec with Matchers {
}
"update members" in {
val joining = TestMember(Address("akka.tcp", "sys", "d", 2552), Joining, Set.empty, team = "dc2")
val joining = TestMember(Address("akka.tcp", "sys", "d", 2552), Joining, Set.empty, dataCenter = "dc2")
val g = Gossip(members = SortedSet(dc1a1, joining))
g.member(joining.uniqueAddress).status should ===(Joining)

View file

@ -9,6 +9,6 @@ object TestMember {
def apply(address: Address, status: MemberStatus): Member =
apply(address, status, Set.empty)
def apply(address: Address, status: MemberStatus, roles: Set[String], team: ClusterSettings.Team = ClusterSettings.DefaultTeam): Member =
new Member(UniqueAddress(address, 0L), Int.MaxValue, status, roles + (ClusterSettings.TeamRolePrefix + team))
def apply(address: Address, status: MemberStatus, roles: Set[String], dataCenter: ClusterSettings.DataCenter = ClusterSettings.DefaultDataCenter): Member =
new Member(UniqueAddress(address, 0L), Int.MaxValue, status, roles + (ClusterSettings.DcRolePrefix + dataCenter))
}

View file

@ -80,25 +80,22 @@ class ClusterMessageSerializerSpec extends AkkaSpec(
checkSerialization(InternalClusterAction.Welcome(uniqueAddress, g2))
}
"add a default team role if none is present" in {
"add a default data center role if none is present" in {
val env = roundtrip(GossipEnvelope(a1.uniqueAddress, d1.uniqueAddress, Gossip(SortedSet(a1, d1))))
env.gossip.members.head.roles should be(Set(ClusterSettings.TeamRolePrefix + "default"))
env.gossip.members.tail.head.roles should be(Set("r1", ClusterSettings.TeamRolePrefix + "foo"))
env.gossip.members.head.roles should be(Set(ClusterSettings.DcRolePrefix + "default"))
env.gossip.members.tail.head.roles should be(Set("r1", ClusterSettings.DcRolePrefix + "foo"))
}
}
"Cluster router pool" must {
"be serializable" in {
checkSerialization(ClusterRouterPool(
RoundRobinPool(
nrOfInstances = 4
),
nrOfInstances = 4),
ClusterRouterPoolSettings(
totalInstances = 2,
maxInstancesPerNode = 5,
allowLocalRoutees = true,
useRole = Some("Richard, Duke of Gloucester")
)
))
useRole = Some("Richard, Duke of Gloucester"))))
}
}

View file

@ -1134,7 +1134,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
override def preStart(): Unit = {
if (hasDurableKeys)
durableStore ! LoadAll
// not using LeaderChanged/RoleLeaderChanged because here we need one node independent of team
// not using LeaderChanged/RoleLeaderChanged because here we need one node independent of data center
cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
classOf[MemberEvent], classOf[ReachabilityEvent])
}

View file

@ -1239,7 +1239,7 @@ object MiMa extends AutoPlugin {
// older versions will be missing the method. We accept that incompatibility for now.
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.transport.AssociationHandle.disassociate"),
// #23228 single leader per cluster team
// #23228 single leader per cluster data center
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.apply"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.copy"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.this"),