parent
be5a0207bb
commit
9c7e8d027a
16 changed files with 48 additions and 46 deletions
|
|
@ -520,7 +520,7 @@ private[akka] class ClusterShardingGuardian extends Actor {
|
|||
case None ⇒ "replicator"
|
||||
}
|
||||
// Use members within the data center and with the given role (if any)
|
||||
val replicatorRoles = Set(ClusterSettings.DcRolePrefix + cluster.settings.DataCenter) ++ settings.role
|
||||
val replicatorRoles = Set(ClusterSettings.DcRolePrefix + cluster.settings.SelfDataCenter) ++ settings.role
|
||||
val ref = context.actorOf(Replicator.props(replicatorSettings.withRoles(replicatorRoles)), name)
|
||||
replicatorByRole = replicatorByRole.updated(settings.role, ref)
|
||||
ref
|
||||
|
|
|
|||
|
|
@ -426,7 +426,7 @@ private[akka] class ShardRegion(
|
|||
// when using proxy the data center can be different from the own data center
|
||||
private val targetDcRole = dataCenter match {
|
||||
case Some(t) ⇒ ClusterSettings.DcRolePrefix + t
|
||||
case None ⇒ ClusterSettings.DcRolePrefix + cluster.settings.DataCenter
|
||||
case None ⇒ ClusterSettings.DcRolePrefix + cluster.settings.SelfDataCenter
|
||||
}
|
||||
|
||||
def matchingRole(member: Member): Boolean =
|
||||
|
|
|
|||
|
|
@ -62,11 +62,11 @@ object MultiDcClusterShardingSpecConfig extends MultiNodeConfig {
|
|||
"""))
|
||||
|
||||
nodeConfig(first, second) {
|
||||
ConfigFactory.parseString("akka.cluster.data-center = DC1")
|
||||
ConfigFactory.parseString("akka.cluster.multi-data-center.self-data-center = DC1")
|
||||
}
|
||||
|
||||
nodeConfig(third, fourth) {
|
||||
ConfigFactory.parseString("akka.cluster.data-center = DC2")
|
||||
ConfigFactory.parseString("akka.cluster.multi-data-center.self-data-center = DC2")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -257,7 +257,7 @@ object ClusterSingletonManager {
|
|||
}
|
||||
override def postStop(): Unit = cluster.unsubscribe(self)
|
||||
|
||||
private val selfDc = ClusterSettings.DcRolePrefix + cluster.settings.DataCenter
|
||||
private val selfDc = ClusterSettings.DcRolePrefix + cluster.settings.SelfDataCenter
|
||||
|
||||
def matchingRole(member: Member): Boolean =
|
||||
member.hasRole(selfDc) && role.forall(member.hasRole)
|
||||
|
|
|
|||
|
|
@ -179,7 +179,7 @@ final class ClusterSingletonProxy(singletonManagerPath: String, settings: Cluste
|
|||
|
||||
private val targetDcRole = settings.dataCenter match {
|
||||
case Some(t) ⇒ ClusterSettings.DcRolePrefix + t
|
||||
case None ⇒ ClusterSettings.DcRolePrefix + cluster.settings.DataCenter
|
||||
case None ⇒ ClusterSettings.DcRolePrefix + cluster.settings.SelfDataCenter
|
||||
}
|
||||
|
||||
def matchingRole(member: Member): Boolean =
|
||||
|
|
|
|||
|
|
@ -28,18 +28,18 @@ object MultiDcSingletonManagerSpec extends MultiNodeConfig {
|
|||
|
||||
nodeConfig(controller) {
|
||||
ConfigFactory.parseString("""
|
||||
akka.cluster.data-center = one
|
||||
akka.cluster.multi-data-center.self-data-center = one
|
||||
akka.cluster.roles = []""")
|
||||
}
|
||||
|
||||
nodeConfig(first) {
|
||||
ConfigFactory.parseString("""
|
||||
akka.cluster.data-center = one
|
||||
akka.cluster.multi-data-center.self-data-center = one
|
||||
akka.cluster.roles = [ worker ]""")
|
||||
}
|
||||
nodeConfig(second, third) {
|
||||
ConfigFactory.parseString("""
|
||||
akka.cluster.data-center = two
|
||||
akka.cluster.multi-data-center.self-data-center = two
|
||||
akka.cluster.roles = [ worker ]""")
|
||||
}
|
||||
}
|
||||
|
|
@ -56,7 +56,7 @@ class MultiDcSingleton extends Actor with ActorLogging {
|
|||
|
||||
override def receive: Receive = {
|
||||
case Ping ⇒
|
||||
sender() ! Pong(cluster.settings.DataCenter, cluster.selfAddress, cluster.selfRoles)
|
||||
sender() ! Pong(cluster.settings.SelfDataCenter, cluster.selfAddress, cluster.selfRoles)
|
||||
}
|
||||
}
|
||||
object MultiDcSingleton {
|
||||
|
|
@ -98,7 +98,7 @@ abstract class MultiDcSingletonManagerSpec extends MultiNodeSpec(MultiDcSingleto
|
|||
|
||||
enterBarrier("pongs-received")
|
||||
|
||||
pong.fromDc should equal(Cluster(system).settings.DataCenter)
|
||||
pong.fromDc should equal(Cluster(system).settings.SelfDataCenter)
|
||||
pong.roles should contain(worker)
|
||||
runOn(controller, first) {
|
||||
pong.roles should contain(ClusterSettings.DcRolePrefix + "one")
|
||||
|
|
|
|||
|
|
@ -65,18 +65,13 @@ akka {
|
|||
# move 'WeaklyUp' members to 'Up' status once convergence has been reached.
|
||||
allow-weakly-up-members = on
|
||||
|
||||
# Defines which data center this node belongs to. It is typically used to make islands of the
|
||||
# cluster that are colocated. This can be used to make the cluster aware that it is running
|
||||
# across multiple availability zones or regions. It can also be used for other logical
|
||||
# grouping of nodes.
|
||||
data-center = "default"
|
||||
|
||||
# The roles of this member. List of strings, e.g. roles = ["A", "B"].
|
||||
# The roles are part of the membership information and can be used by
|
||||
# routers or other services to distribute work to certain member types,
|
||||
# e.g. front-end and back-end nodes.
|
||||
# Roles are not allowed to start with "dc-" as that is reserved for the
|
||||
# special role assigned from the data-center a node belongs to (see above)
|
||||
# special role assigned from the data-center a node belongs to (see the
|
||||
# multi-data-center section below)
|
||||
roles = []
|
||||
|
||||
# Run the coordinated shutdown from phase 'cluster-shutdown' when the cluster
|
||||
|
|
@ -211,6 +206,13 @@ akka {
|
|||
# if your cluster nodes are configured with at-least 2 different `akka.cluster.data-center` values.
|
||||
multi-data-center {
|
||||
|
||||
# Defines which data center this node belongs to. It is typically used to make islands of the
|
||||
# cluster that are colocated. This can be used to make the cluster aware that it is running
|
||||
# across multiple availability zones or regions. It can also be used for other logical
|
||||
# grouping of nodes.
|
||||
self-data-center = "default"
|
||||
|
||||
|
||||
# Try to limit the number of connections between data centers. Used for gossip and heartbeating.
|
||||
# This will not limit connections created for the messaging of the application.
|
||||
# If the cluster does not span multiple data centers, this value has no effect.
|
||||
|
|
|
|||
|
|
@ -79,7 +79,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
|||
def selfAddress: Address = selfUniqueAddress.address
|
||||
|
||||
/** Data center to which this node belongs to (defaults to "default" if not configured explicitly) */
|
||||
def selfDataCenter: DataCenter = settings.DataCenter
|
||||
def selfDataCenter: DataCenter = settings.SelfDataCenter
|
||||
|
||||
/**
|
||||
* roles that this member has
|
||||
|
|
@ -434,31 +434,31 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
|||
|
||||
def logInfo(message: String): Unit =
|
||||
if (LogInfo)
|
||||
if (settings.DataCenter == ClusterSettings.DefaultDataCenter)
|
||||
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
|
||||
log.info("Cluster Node [{}] - {}", selfAddress, message)
|
||||
else
|
||||
log.info("Cluster Node [{}] dc [{}] - {}", selfAddress, settings.DataCenter, message)
|
||||
log.info("Cluster Node [{}] dc [{}] - {}", selfAddress, settings.SelfDataCenter, message)
|
||||
|
||||
def logInfo(template: String, arg1: Any): Unit =
|
||||
if (LogInfo)
|
||||
if (settings.DataCenter == ClusterSettings.DefaultDataCenter)
|
||||
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
|
||||
log.info("Cluster Node [{}] - " + template, selfAddress, arg1)
|
||||
else
|
||||
log.info("Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.DataCenter, arg1)
|
||||
log.info("Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1)
|
||||
|
||||
def logInfo(template: String, arg1: Any, arg2: Any): Unit =
|
||||
if (LogInfo)
|
||||
if (settings.DataCenter == ClusterSettings.DefaultDataCenter)
|
||||
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
|
||||
log.info("Cluster Node [{}] - " + template, selfAddress, arg1, arg2)
|
||||
else
|
||||
log.info("Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.DataCenter, arg1, arg2)
|
||||
log.info("Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1, arg2)
|
||||
|
||||
def logInfo(template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
|
||||
if (LogInfo)
|
||||
if (settings.DataCenter == ClusterSettings.DefaultDataCenter)
|
||||
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
|
||||
log.info("Cluster Node [{}] - " + template, selfAddress, arg1, arg2, arg3)
|
||||
else
|
||||
log.info("Cluster Node [{}] dc [" + settings.DataCenter + "] - " + template, selfAddress, arg1, arg2, arg3)
|
||||
log.info("Cluster Node [{}] dc [" + settings.SelfDataCenter + "] - " + template, selfAddress, arg1, arg2, arg3)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -299,7 +299,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
var membershipState = MembershipState(
|
||||
Gossip.empty,
|
||||
cluster.selfUniqueAddress,
|
||||
cluster.settings.DataCenter,
|
||||
cluster.settings.SelfDataCenter,
|
||||
cluster.settings.MultiDataCenter.CrossDcConnections)
|
||||
|
||||
def latestGossip: Gossip = membershipState.latestGossip
|
||||
|
|
@ -1230,7 +1230,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
|
||||
def publishMembershipState(): Unit = {
|
||||
if (cluster.settings.Debug.VerboseGossipLogging)
|
||||
log.debug("Cluster Node [{}] dc [{}] - New gossip published [{}]", selfAddress, cluster.settings.DataCenter, membershipState.latestGossip)
|
||||
log.debug("Cluster Node [{}] dc [{}] - New gossip published [{}]", selfAddress, cluster.settings.SelfDataCenter, membershipState.latestGossip)
|
||||
|
||||
publisher ! PublishChanges(membershipState)
|
||||
if (PublishStatsInterval == Duration.Zero) publishInternalStats()
|
||||
|
|
|
|||
|
|
@ -378,10 +378,10 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
|||
val emptyMembershipState = MembershipState(
|
||||
Gossip.empty,
|
||||
cluster.selfUniqueAddress,
|
||||
cluster.settings.DataCenter,
|
||||
cluster.settings.SelfDataCenter,
|
||||
cluster.settings.MultiDataCenter.CrossDcConnections)
|
||||
var membershipState: MembershipState = emptyMembershipState
|
||||
def selfDc = cluster.settings.DataCenter
|
||||
def selfDc = cluster.settings.SelfDataCenter
|
||||
|
||||
override def preRestart(reason: Throwable, message: Option[Any]) {
|
||||
// don't postStop when restarted, no children to stop
|
||||
|
|
|
|||
|
|
@ -137,14 +137,14 @@ final class ClusterSettings(val config: Config, val systemName: String) {
|
|||
|
||||
val AllowWeaklyUpMembers: Boolean = cc.getBoolean("allow-weakly-up-members")
|
||||
|
||||
val DataCenter: DataCenter = cc.getString("data-center")
|
||||
val SelfDataCenter: DataCenter = cc.getString("multi-data-center.self-data-center")
|
||||
|
||||
val Roles: Set[String] = {
|
||||
val configuredRoles = (immutableSeq(cc.getStringList("roles")).toSet) requiring (
|
||||
_.forall(!_.startsWith(DcRolePrefix)),
|
||||
s"Roles must not start with '${DcRolePrefix}' as that is reserved for the cluster data-center setting")
|
||||
s"Roles must not start with '${DcRolePrefix}' as that is reserved for the cluster self-data-center setting")
|
||||
|
||||
configuredRoles + s"$DcRolePrefix$DataCenter"
|
||||
configuredRoles + s"$DcRolePrefix$SelfDataCenter"
|
||||
}
|
||||
|
||||
val MinNrOfMembers: Int = {
|
||||
|
|
|
|||
|
|
@ -25,12 +25,12 @@ class MultiDcSpecConfig(crossDcConnections: Int = 5) extends MultiNodeConfig {
|
|||
|
||||
nodeConfig(first, second)(ConfigFactory.parseString(
|
||||
"""
|
||||
akka.cluster.data-center = "dc1"
|
||||
akka.cluster.multi-data-center.self-data-center = "dc1"
|
||||
"""))
|
||||
|
||||
nodeConfig(third, fourth, fifth)(ConfigFactory.parseString(
|
||||
"""
|
||||
akka.cluster.data-center = "dc2"
|
||||
akka.cluster.multi-data-center.self-data-center = "dc2"
|
||||
"""))
|
||||
|
||||
testTransport(on = true)
|
||||
|
|
@ -80,13 +80,13 @@ abstract class MultiDcSpec(config: MultiDcSpecConfig)
|
|||
|
||||
"have a leader per data center" in {
|
||||
runOn(first, second) {
|
||||
cluster.settings.DataCenter should ===("dc1")
|
||||
cluster.settings.SelfDataCenter should ===("dc1")
|
||||
clusterView.leader shouldBe defined
|
||||
val dc1 = Set(address(first), address(second))
|
||||
dc1 should contain(clusterView.leader.get)
|
||||
}
|
||||
runOn(third, fourth) {
|
||||
cluster.settings.DataCenter should ===("dc2")
|
||||
cluster.settings.SelfDataCenter should ===("dc2")
|
||||
clusterView.leader shouldBe defined
|
||||
val dc2 = Set(address(third), address(fourth))
|
||||
dc2 should contain(clusterView.leader.get)
|
||||
|
|
|
|||
|
|
@ -24,12 +24,12 @@ object MultiDcSplitBrainMultiJvmSpec extends MultiNodeConfig {
|
|||
|
||||
nodeConfig(first, second)(ConfigFactory.parseString(
|
||||
"""
|
||||
akka.cluster.data-center = "dc1"
|
||||
akka.cluster.multi-data-center.self-data-center = "dc1"
|
||||
"""))
|
||||
|
||||
nodeConfig(third, fourth)(ConfigFactory.parseString(
|
||||
"""
|
||||
akka.cluster.data-center = "dc2"
|
||||
akka.cluster.multi-data-center.self-data-center = "dc2"
|
||||
"""))
|
||||
|
||||
testTransport(on = true)
|
||||
|
|
|
|||
|
|
@ -23,14 +23,14 @@ object MultiDcSunnyWeatherMultiJvmSpec extends MultiNodeConfig {
|
|||
nodeConfig(first, second, third)(ConfigFactory.parseString(
|
||||
"""
|
||||
akka {
|
||||
cluster.data-center = alpha
|
||||
cluster.multi-data-center.self-data-center = alpha
|
||||
}
|
||||
"""))
|
||||
|
||||
nodeConfig(fourth, fifth)(ConfigFactory.parseString(
|
||||
"""
|
||||
akka {
|
||||
cluster.data-center = beta
|
||||
cluster.multi-data-center.self-data-center = beta
|
||||
}
|
||||
"""))
|
||||
|
||||
|
|
|
|||
|
|
@ -306,7 +306,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
|
|||
awaitAssert(clusterView.members.map(_.status) should ===(Set(MemberStatus.Up)))
|
||||
// clusterView.leader is updated by LeaderChanged, await that to be updated also
|
||||
val expectedLeader = clusterView.members.collectFirst {
|
||||
case m if m.dataCenter == cluster.settings.DataCenter ⇒ m.address
|
||||
case m if m.dataCenter == cluster.settings.SelfDataCenter ⇒ m.address
|
||||
}
|
||||
awaitAssert(clusterView.leader should ===(expectedLeader))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ class ClusterConfigSpec extends AkkaSpec {
|
|||
DownRemovalMargin should ===(Duration.Zero)
|
||||
MinNrOfMembers should ===(1)
|
||||
MinNrOfMembersOfRole should ===(Map.empty[String, Int])
|
||||
DataCenter should ===("default")
|
||||
SelfDataCenter should ===("default")
|
||||
Roles should ===(Set(ClusterSettings.DcRolePrefix + "default"))
|
||||
JmxEnabled should ===(true)
|
||||
UseDispatcher should ===(Dispatchers.DefaultDispatcherId)
|
||||
|
|
@ -61,13 +61,13 @@ class ClusterConfigSpec extends AkkaSpec {
|
|||
|akka {
|
||||
| cluster {
|
||||
| roles = [ "hamlet" ]
|
||||
| data-center = "blue"
|
||||
| multi-data-center.self-data-center = "blue"
|
||||
| }
|
||||
|}
|
||||
""".stripMargin).withFallback(ConfigFactory.load()), system.name)
|
||||
import settings._
|
||||
Roles should ===(Set("hamlet", ClusterSettings.DcRolePrefix + "blue"))
|
||||
DataCenter should ===("blue")
|
||||
SelfDataCenter should ===("blue")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue