Make cluster sharding DC aware, #23231

* Sharding only within own team (coordinator is singleton)
* the ddata Replicator used by Sharding must also be only within own team
* added support for Set of roles in ddata Replicator so that can be used
  by sharding to specify role + team
* Sharding proxy can route to sharding in another team
This commit is contained in:
Patrik Nordwall 2017-06-26 15:03:33 +02:00
parent e37243f471
commit e0fe0bc49e
12 changed files with 417 additions and 47 deletions

View file

@ -29,6 +29,7 @@ import akka.cluster.ddata.ReplicatorSettings
import akka.cluster.ddata.Replicator
import scala.util.control.NonFatal
import akka.actor.Status
import akka.cluster.ClusterSettings
/**
* This extension provides sharding functionality of actors in a cluster.
@ -341,16 +342,53 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
typeName: String,
role: Option[String],
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId): ActorRef =
startProxy(typeName, role, team = None, extractEntityId, extractShardId)
/**
* Scala API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode,
* i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any
* entity actors itself. The [[ShardRegion]] actor for this type can later be retrieved with the
* [[#shardRegion]] method.
*
* Some settings can be configured as described in the `akka.cluster.sharding` section
* of the `reference.conf`.
*
* @param typeName the name of the entity type
* @param role specifies that this entity type is located on cluster nodes with a specific role.
* If the role is not specified all nodes in the cluster are used.
* @param team The team of the cluster nodes where the cluster sharding is running.
* If None then the same team as current node.
* @param extractEntityId partial function to extract the entity id and the message to send to the
* entity from the incoming message, if the partial function does not match the message will
* be `unhandled`, i.e. posted as `Unhandled` messages on the event stream
* @param extractShardId function to determine the shard id for an incoming message, only messages
* that passed the `extractEntityId` will be used
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
*/
def startProxy(
typeName: String,
role: Option[String],
team: Option[String],
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId): ActorRef = {
implicit val timeout = system.settings.CreationTimeout
val settings = ClusterShardingSettings(system).withRole(role)
val startMsg = StartProxy(typeName, settings, extractEntityId, extractShardId)
val startMsg = StartProxy(typeName, team, settings, extractEntityId, extractShardId)
val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration)
regions.put(typeName, shardRegion)
// it must be possible to start several proxies, one per team
regions.put(proxyName(typeName, team), shardRegion)
shardRegion
}
private def proxyName(typeName: String, team: Option[String]): String = {
team match {
case None typeName
case Some(t) typeName + "-" + t
}
}
/**
* Java/Scala API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode,
* i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any
@ -370,9 +408,34 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
def startProxy(
typeName: String,
role: Optional[String],
messageExtractor: ShardRegion.MessageExtractor): ActorRef =
startProxy(typeName, role, team = Optional.empty(), messageExtractor)
/**
* Java/Scala API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode,
* i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any
* entity actors itself. The [[ShardRegion]] actor for this type can later be retrieved with the
* [[#shardRegion]] method.
*
* Some settings can be configured as described in the `akka.cluster.sharding` section
* of the `reference.conf`.
*
* @param typeName the name of the entity type
* @param role specifies that this entity type is located on cluster nodes with a specific role.
* If the role is not specified all nodes in the cluster are used.
* @param team The team of the cluster nodes where the cluster sharding is running.
* If None then the same team as current node.
* @param messageExtractor functions to extract the entity id, shard id, and the message to send to the
* entity from the incoming message
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
*/
def startProxy(
typeName: String,
role: Optional[String],
team: Optional[String],
messageExtractor: ShardRegion.MessageExtractor): ActorRef = {
startProxy(typeName, Option(role.orElse(null)),
startProxy(typeName, Option(role.orElse(null)), Option(team.orElse(null)),
extractEntityId = {
case msg if messageExtractor.entityId(msg) ne null
(messageExtractor.entityId(msg), messageExtractor.entityMessage(msg))
@ -383,14 +446,28 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
/**
* Retrieve the actor reference of the [[ShardRegion]] actor responsible for the named entity type.
* The entity type must be registered with the [[#start]] method before it can be used here.
* Messages to the entity is always sent via the `ShardRegion`.
* The entity type must be registered with the [[#start]] or [[#startProxy]] method before it
* can be used here. Messages to the entity is always sent via the `ShardRegion`.
*/
def shardRegion(typeName: String): ActorRef = regions.get(typeName) match {
case null throw new IllegalArgumentException(s"Shard type [$typeName] must be started first")
case ref ref
}
/**
* Retrieve the actor reference of the [[ShardRegion]] actor that will act as a proxy to the
* named entity type running in another team. A proxy within the same team can be accessed
* with [[#shardRegion]] instead of this method. The entity type must be registered with the
* [[#startProxy]] method before it can be used here. Messages to the entity is always sent
* via the `ShardRegion`.
*/
def shardRegionProxy(typeName: String, team: String): ActorRef = {
regions.get(proxyName(typeName, Some(team))) match {
case null throw new IllegalArgumentException(s"Shard type [$typeName] must be started first")
case ref ref
}
}
}
/**
@ -402,7 +479,7 @@ private[akka] object ClusterShardingGuardian {
extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId,
allocationStrategy: ShardAllocationStrategy, handOffStopMessage: Any)
extends NoSerializationVerificationNeeded
final case class StartProxy(typeName: String, settings: ClusterShardingSettings,
final case class StartProxy(typeName: String, team: Option[String], settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId)
extends NoSerializationVerificationNeeded
final case class Started(shardRegion: ActorRef) extends NoSerializationVerificationNeeded
@ -441,7 +518,9 @@ private[akka] class ClusterShardingGuardian extends Actor {
case Some(r) URLEncoder.encode(r, ByteString.UTF_8) + "Replicator"
case None "replicator"
}
val ref = context.actorOf(Replicator.props(replicatorSettings.withRole(settings.role)), name)
// Use members within the team and with the given role (if any)
val replicatorRoles = Set(ClusterSettings.TeamRolePrefix + cluster.settings.Team) ++ settings.role
val ref = context.actorOf(Replicator.props(replicatorSettings.withRoles(replicatorRoles)), name)
replicatorByRole = replicatorByRole.updated(settings.role, ref)
ref
}
@ -505,22 +584,29 @@ private[akka] class ClusterShardingGuardian extends Actor {
sender() ! Status.Failure(e)
}
case StartProxy(typeName, settings, extractEntityId, extractShardId)
case StartProxy(typeName, team, settings, extractEntityId, extractShardId)
try {
val encName = URLEncoder.encode(typeName, ByteString.UTF_8)
val cName = coordinatorSingletonManagerName(encName)
val cPath = coordinatorPath(encName)
val shardRegion = context.child(encName).getOrElse {
// it must be possible to start several proxies, one per team
val actorName = team match {
case None encName
case Some(t) URLEncoder.encode(typeName + "-" + t, ByteString.UTF_8)
}
val shardRegion = context.child(actorName).getOrElse {
context.actorOf(
ShardRegion.proxyProps(
typeName = typeName,
team = team,
settings = settings,
coordinatorPath = cPath,
extractEntityId = extractEntityId,
extractShardId = extractShardId,
replicator = context.system.deadLetters,
majorityMinCap).withDispatcher(context.props.dispatcher),
name = encName)
name = actorName)
}
sender() ! Started(shardRegion)
} catch {

View file

@ -20,6 +20,7 @@ import scala.concurrent.Future
import scala.reflect.ClassTag
import scala.concurrent.Promise
import akka.Done
import akka.cluster.ClusterSettings
/**
* @see [[ClusterSharding$ ClusterSharding extension]]
@ -40,7 +41,7 @@ object ShardRegion {
handOffStopMessage: Any,
replicator: ActorRef,
majorityMinCap: Int): Props =
Props(new ShardRegion(typeName, Some(entityProps), settings, coordinatorPath, extractEntityId,
Props(new ShardRegion(typeName, Some(entityProps), team = None, settings, coordinatorPath, extractEntityId,
extractShardId, handOffStopMessage, replicator, majorityMinCap)).withDeploy(Deploy.local)
/**
@ -50,13 +51,14 @@ object ShardRegion {
*/
private[akka] def proxyProps(
typeName: String,
team: Option[String],
settings: ClusterShardingSettings,
coordinatorPath: String,
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId,
replicator: ActorRef,
majorityMinCap: Int): Props =
Props(new ShardRegion(typeName, None, settings, coordinatorPath, extractEntityId, extractShardId,
Props(new ShardRegion(typeName, None, team, settings, coordinatorPath, extractEntityId, extractShardId,
PoisonPill, replicator, majorityMinCap)).withDeploy(Deploy.local)
/**
@ -365,6 +367,7 @@ object ShardRegion {
private[akka] class ShardRegion(
typeName: String,
entityProps: Option[Props],
team: Option[String],
settings: ClusterShardingSettings,
coordinatorPath: String,
extractEntityId: ShardRegion.ExtractEntityId,
@ -419,11 +422,15 @@ private[akka] class ShardRegion(
retryTask.cancel()
}
def matchingRole(member: Member): Boolean = role match {
case None true
case Some(r) member.hasRole(r)
// when using proxy the team can be different that the own team
private val targetTeamRole = team match {
case Some(t) ClusterSettings.TeamRolePrefix + t
case None ClusterSettings.TeamRolePrefix + cluster.settings.Team
}
def matchingRole(member: Member): Boolean =
member.hasRole(targetTeamRole) && role.forall(member.hasRole)
def coordinatorSelection: Option[ActorSelection] =
membersByAge.headOption.map(m context.actorSelection(RootActorPath(m.address) + coordinatorPath))

View file

@ -460,6 +460,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
val proxy = system.actorOf(
ShardRegion.proxyProps(
typeName = "counter",
team = None,
settings,
coordinatorPath = "/user/counterCoordinator/singleton/coordinator",
extractEntityId = extractEntityId,

View file

@ -0,0 +1,218 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster.sharding
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus
import akka.cluster.sharding.ShardRegion.CurrentRegions
import akka.cluster.sharding.ShardRegion.GetCurrentRegions
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
object TeamClusterShardingSpec {
sealed trait EntityMsg {
def id: String
}
final case class Ping(id: String) extends EntityMsg
final case class GetCount(id: String) extends EntityMsg
class Entity extends Actor {
var count = 0
def receive = {
case Ping(_)
count += 1
sender() ! self
case GetCount(_)
sender() ! count
}
}
val extractEntityId: ShardRegion.ExtractEntityId = {
case m: EntityMsg (m.id, m)
}
val extractShardId: ShardRegion.ExtractShardId = {
case m: EntityMsg m.id.charAt(0).toString
}
}
object TeamClusterShardingSpecConfig extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
"""))
nodeConfig(first, second) {
ConfigFactory.parseString("akka.cluster.team = DC1")
}
nodeConfig(third, fourth) {
ConfigFactory.parseString("akka.cluster.team = DC2")
}
}
class TeamClusterShardingMultiJvmNode1 extends TeamClusterShardingSpec
class TeamClusterShardingMultiJvmNode2 extends TeamClusterShardingSpec
class TeamClusterShardingMultiJvmNode3 extends TeamClusterShardingSpec
class TeamClusterShardingMultiJvmNode4 extends TeamClusterShardingSpec
abstract class TeamClusterShardingSpec extends MultiNodeSpec(TeamClusterShardingSpecConfig)
with STMultiNodeSpec with ImplicitSender {
import TeamClusterShardingSpec._
import TeamClusterShardingSpecConfig._
override def initialParticipants = roles.size
val cluster = Cluster(system)
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster join node(to).address
startSharding()
within(15.seconds) {
awaitAssert(cluster.state.members.exists { m
m.uniqueAddress == cluster.selfUniqueAddress && m.status == MemberStatus.Up
} should be(true))
}
}
enterBarrier(from.name + "-joined")
}
def startSharding(): Unit = {
ClusterSharding(system).start(
typeName = "Entity",
entityProps = Props[Entity],
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
}
lazy val region = ClusterSharding(system).shardRegion("Entity")
private def fillAddress(a: Address): Address =
if (a.hasLocalScope) Cluster(system).selfAddress else a
private def assertCurrentRegions(expected: Set[Address]): Unit = {
awaitAssert({
val p = TestProbe()
region.tell(GetCurrentRegions, p.ref)
p.expectMsg(CurrentRegions(expected))
}, 10.seconds)
}
s"Cluster sharding with teams" must {
"join cluster" in within(20.seconds) {
join(first, first)
join(second, first)
join(third, first)
join(fourth, first)
awaitAssert({
Cluster(system).state.members.size should ===(4)
Cluster(system).state.members.map(_.status) should ===(Set(MemberStatus.Up))
}, 10.seconds)
runOn(first, second) {
assertCurrentRegions(Set(first, second).map(r node(r).address))
}
runOn(third, fourth) {
assertCurrentRegions(Set(third, fourth).map(r node(r).address))
}
enterBarrier("after-1")
}
"initialize shards" in {
runOn(first) {
val locations = (for (n 1 to 10) yield {
val id = n.toString
region ! Ping(id)
id expectMsgType[ActorRef]
}).toMap
val firstAddress = node(first).address
val secondAddress = node(second).address
val hosts = locations.values.map(ref fillAddress(ref.path.address)).toSet
hosts should ===(Set(firstAddress, secondAddress))
}
runOn(third) {
val locations = (for (n 1 to 10) yield {
val id = n.toString
region ! Ping(id)
val ref1 = expectMsgType[ActorRef]
region ! Ping(id)
val ref2 = expectMsgType[ActorRef]
ref1 should ===(ref2)
id ref1
}).toMap
val thirdAddress = node(third).address
val fourthAddress = node(fourth).address
val hosts = locations.values.map(ref fillAddress(ref.path.address)).toSet
hosts should ===(Set(thirdAddress, fourthAddress))
}
enterBarrier("after-2")
}
"not mix entities in different teams" in {
runOn(second) {
region ! GetCount("5")
expectMsg(1)
}
runOn(fourth) {
region ! GetCount("5")
expectMsg(2)
}
enterBarrier("after-3")
}
"allow proxy within same team" in {
runOn(second) {
val proxy = ClusterSharding(system).startProxy(
typeName = "Entity",
role = None,
team = None, // by default use own team
extractEntityId = extractEntityId,
extractShardId = extractShardId)
proxy ! GetCount("5")
expectMsg(1)
}
enterBarrier("after-4")
}
"allow proxy across different teams" in {
runOn(second) {
val proxy = ClusterSharding(system).startProxy(
typeName = "Entity",
role = None,
team = Some("DC2"), // proxy to other DC
extractEntityId = extractEntityId,
extractShardId = extractShardId)
proxy ! GetCount("5")
expectMsg(2)
}
enterBarrier("after-5")
}
}
}

View file

@ -176,13 +176,13 @@ final class ClusterSingletonProxy(singletonManagerPath: String, settings: Cluste
identifyTimer = None
}
private val targetTeam = settings.team match {
private val targetTeamRole = settings.team match {
case Some(t) ClusterSettings.TeamRolePrefix + t
case None ClusterSettings.TeamRolePrefix + cluster.settings.Team
}
def matchingRole(member: Member): Boolean =
member.hasRole(targetTeam) && role.forall(member.hasRole)
member.hasRole(targetTeamRole) && role.forall(member.hasRole)
def handleInitial(state: CurrentClusterState): Unit = {
trackChange {

View file

@ -12,6 +12,7 @@ import akka.cluster.Cluster
import akka.testkit.ImplicitSender
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
import akka.cluster.ClusterSettings
object TeamSingletonManagerSpec extends MultiNodeConfig {
val controller = role("controller")
@ -100,10 +101,10 @@ abstract class TeamSingletonManagerSpec extends MultiNodeSpec(TeamSingletonManag
pong.fromTeam should equal(Cluster(system).settings.Team)
pong.roles should contain(worker)
runOn(controller, first) {
pong.roles should contain("team-one")
pong.roles should contain(ClusterSettings.TeamRolePrefix + "one")
}
runOn(second, third) {
pong.roles should contain("team-two")
pong.roles should contain(ClusterSettings.TeamRolePrefix + "two")
}
enterBarrier("after-1")
@ -118,7 +119,7 @@ abstract class TeamSingletonManagerSpec extends MultiNodeSpec(TeamSingletonManag
val pong = expectMsgType[TeamSingleton.Pong](10.seconds)
pong.fromTeam should ===("one")
pong.roles should contain(worker)
pong.roles should contain("team-one")
pong.roles should contain(ClusterSettings.TeamRolePrefix + "one")
}
enterBarrier("after-1")
}

View file

@ -34,6 +34,7 @@ object ClusterSettings {
}
final class ClusterSettings(val config: Config, val systemName: String) {
import ClusterSettings._
import ClusterSettings._
private val cc = config.getConfig("akka.cluster")

View file

@ -46,7 +46,7 @@ class ClusterConfigSpec extends AkkaSpec {
MinNrOfMembers should ===(1)
MinNrOfMembersOfRole should ===(Map.empty[String, Int])
Team should ===("default")
Roles should ===(Set("team-default"))
Roles should ===(Set(ClusterSettings.TeamRolePrefix + "default"))
JmxEnabled should ===(true)
UseDispatcher should ===(Dispatchers.DefaultDispatcherId)
GossipDifferentViewProbability should ===(0.8 +- 0.0001)
@ -66,7 +66,7 @@ class ClusterConfigSpec extends AkkaSpec {
|}
""".stripMargin).withFallback(ConfigFactory.load()), system.name)
import settings._
Roles should ===(Set("hamlet", "team-blue"))
Roles should ===(Set("hamlet", ClusterSettings.TeamRolePrefix + "blue"))
Team should ===("blue")
}
}

View file

@ -34,7 +34,7 @@ class DistributedData(system: ExtendedActorSystem) extends Extension {
* Returns true if this member is not tagged with the role configured for the
* replicas.
*/
def isTerminated: Boolean = Cluster(system).isTerminated || !settings.role.forall(Cluster(system).selfRoles.contains)
def isTerminated: Boolean = Cluster(system).isTerminated || !settings.roles.subsetOf(Cluster(system).selfRoles)
/**
* `ActorRef` of the [[Replicator]] .

View file

@ -48,6 +48,9 @@ import akka.actor.Cancellable
import scala.util.control.NonFatal
import akka.cluster.ddata.Key.KeyId
import akka.annotation.InternalApi
import scala.collection.immutable.TreeSet
import akka.cluster.MemberStatus
import scala.annotation.varargs
object ReplicatorSettings {
@ -98,8 +101,8 @@ object ReplicatorSettings {
}
/**
* @param role Replicas are running on members tagged with this role.
* All members are used if undefined.
* @param roles Replicas are running on members tagged with these roles.
* The member must have all given roles. All members are used if empty.
* @param gossipInterval How often the Replicator should send out gossip information.
* @param notifySubscribersInterval How often the subscribers will be notified
* of changes, if any.
@ -124,7 +127,7 @@ object ReplicatorSettings {
* in the `Set`.
*/
final class ReplicatorSettings(
val role: Option[String],
val roles: Set[String],
val gossipInterval: FiniteDuration,
val notifySubscribersInterval: FiniteDuration,
val maxDeltaElements: Int,
@ -138,10 +141,29 @@ final class ReplicatorSettings(
val deltaCrdtEnabled: Boolean,
val maxDeltaSize: Int) {
// for backwards compatibility
def this(
role: Option[String],
gossipInterval: FiniteDuration,
notifySubscribersInterval: FiniteDuration,
maxDeltaElements: Int,
dispatcher: String,
pruningInterval: FiniteDuration,
maxPruningDissemination: FiniteDuration,
durableStoreProps: Either[(String, Config), Props],
durableKeys: Set[KeyId],
pruningMarkerTimeToLive: FiniteDuration,
durablePruningMarkerTimeToLive: FiniteDuration,
deltaCrdtEnabled: Boolean,
maxDeltaSize: Int) =
this(role.toSet, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval,
maxPruningDissemination, durableStoreProps, durableKeys, pruningMarkerTimeToLive, durablePruningMarkerTimeToLive,
deltaCrdtEnabled, maxDeltaSize)
// For backwards compatibility
def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration,
maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration) =
this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval,
this(roles = role.toSet, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval,
maxPruningDissemination, Right(Props.empty), Set.empty, 6.hours, 10.days, true, 200)
// For backwards compatibility
@ -161,9 +183,20 @@ final class ReplicatorSettings(
maxPruningDissemination, durableStoreProps, durableKeys, pruningMarkerTimeToLive, durablePruningMarkerTimeToLive,
deltaCrdtEnabled, 200)
def withRole(role: String): ReplicatorSettings = copy(role = ReplicatorSettings.roleOption(role))
def withRole(role: String): ReplicatorSettings = copy(roles = ReplicatorSettings.roleOption(role).toSet)
def withRole(role: Option[String]): ReplicatorSettings = copy(role = role)
def withRole(role: Option[String]): ReplicatorSettings = copy(roles = role.toSet)
@varargs
def withRoles(roles: String*): ReplicatorSettings = copy(roles = roles.toSet)
/**
* INTERNAL API
*/
@InternalApi private[akka] def withRoles(roles: Set[String]): ReplicatorSettings = copy(roles = roles)
// for backwards compatibility
def role: Option[String] = roles.headOption
def withGossipInterval(gossipInterval: FiniteDuration): ReplicatorSettings =
copy(gossipInterval = gossipInterval)
@ -216,7 +249,7 @@ final class ReplicatorSettings(
copy(maxDeltaSize = maxDeltaSize)
private def copy(
role: Option[String] = role,
roles: Set[String] = roles,
gossipInterval: FiniteDuration = gossipInterval,
notifySubscribersInterval: FiniteDuration = notifySubscribersInterval,
maxDeltaElements: Int = maxDeltaElements,
@ -229,7 +262,7 @@ final class ReplicatorSettings(
durablePruningMarkerTimeToLive: FiniteDuration = durablePruningMarkerTimeToLive,
deltaCrdtEnabled: Boolean = deltaCrdtEnabled,
maxDeltaSize: Int = maxDeltaSize): ReplicatorSettings =
new ReplicatorSettings(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher,
new ReplicatorSettings(roles, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher,
pruningInterval, maxPruningDissemination, durableStoreProps, durableKeys,
pruningMarkerTimeToLive, durablePruningMarkerTimeToLive, deltaCrdtEnabled, maxDeltaSize)
}
@ -988,8 +1021,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
require(!cluster.isTerminated, "Cluster node must not be terminated")
require(
role.forall(cluster.selfRoles.contains),
s"This cluster member [${selfAddress}] doesn't have the role [$role]")
roles.subsetOf(cluster.selfRoles),
s"This cluster member [${selfAddress}] doesn't have all the roles [${roles.mkString(", ")}]")
//Start periodic gossip to random nodes in cluster
import context.dispatcher
@ -1057,8 +1090,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
var weaklyUpNodes: Set[Address] = Set.empty
var removedNodes: Map[UniqueAddress, Long] = Map.empty
var leader: Option[Address] = None
def isLeader: Boolean = leader.exists(_ == selfAddress)
// all nodes sorted with the leader first
var leader: TreeSet[Member] = TreeSet.empty(Member.leaderStatusOrdering)
def isLeader: Boolean =
leader.nonEmpty && leader.head.address == selfAddress && leader.head.status == MemberStatus.Up
// for pruning timeouts are based on clock that is only increased when all nodes are reachable
var previousClockTime = System.nanoTime()
@ -1099,9 +1134,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
override def preStart(): Unit = {
if (hasDurableKeys)
durableStore ! LoadAll
val leaderChangedClass = if (role.isDefined) classOf[RoleLeaderChanged] else classOf[LeaderChanged]
// not using LeaderChanged/RoleLeaderChanged because here we need one node independent of team
cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
classOf[MemberEvent], classOf[ReachabilityEvent], leaderChangedClass)
classOf[MemberEvent], classOf[ReachabilityEvent])
}
override def postStop(): Unit = {
@ -1113,7 +1148,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
clockTask.cancel()
}
def matchingRole(m: Member): Boolean = role.forall(m.hasRole)
def matchingRole(m: Member): Boolean = roles.subsetOf(m.roles)
override val supervisorStrategy = {
def fromDurableStore: Boolean = sender() == durableStore && sender() != context.system.deadLetters
@ -1204,11 +1239,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
case MemberWeaklyUp(m) receiveWeaklyUpMemberUp(m)
case MemberUp(m) receiveMemberUp(m)
case MemberRemoved(m, _) receiveMemberRemoved(m)
case _: MemberEvent // not of interest
case evt: MemberEvent receiveOtherMemberEvent(evt.member)
case UnreachableMember(m) receiveUnreachable(m)
case ReachableMember(m) receiveReachable(m)
case LeaderChanged(leader) receiveLeaderChanged(leader, None)
case RoleLeaderChanged(role, leader) receiveLeaderChanged(leader, Some(role))
case GetKeyIds receiveGetKeyIds()
case Delete(key, consistency, req) receiveDelete(key, consistency, req)
case RemovedNodePruningTick receiveRemovedNodePruningTick()
@ -1695,15 +1728,19 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
weaklyUpNodes += m.address
def receiveMemberUp(m: Member): Unit =
if (matchingRole(m) && m.address != selfAddress) {
nodes += m.address
weaklyUpNodes -= m.address
if (matchingRole(m)) {
leader += m
if (m.address != selfAddress) {
nodes += m.address
weaklyUpNodes -= m.address
}
}
def receiveMemberRemoved(m: Member): Unit = {
if (m.address == selfAddress)
context stop self
else if (matchingRole(m)) {
leader -= m
nodes -= m.address
weaklyUpNodes -= m.address
log.debug("adding removed node [{}] from MemberRemoved", m.uniqueAddress)
@ -1713,15 +1750,18 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
}
def receiveOtherMemberEvent(m: Member): Unit =
if (matchingRole(m)) {
// update changed status
leader = (leader - m) + m
}
def receiveUnreachable(m: Member): Unit =
if (matchingRole(m)) unreachable += m.address
def receiveReachable(m: Member): Unit =
if (matchingRole(m)) unreachable -= m.address
def receiveLeaderChanged(leaderOption: Option[Address], roleOption: Option[String]): Unit =
if (roleOption == role) leader = leaderOption
def receiveClockTick(): Unit = {
val now = System.nanoTime()
if (unreachable.isEmpty)

View file

@ -16,6 +16,7 @@ import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.actor.ActorRef
import scala.concurrent.Await
import akka.cluster.MemberStatus
object DurablePruningSpec extends MultiNodeConfig {
val first = role("first")
@ -73,6 +74,13 @@ class DurablePruningSpec extends MultiNodeSpec(DurablePruningSpec) with STMultiN
val replicator2 = startReplicator(sys2)
val probe2 = TestProbe()(sys2)
Cluster(sys2).join(node(first).address)
awaitAssert({
Cluster(system).state.members.size should ===(4)
Cluster(system).state.members.map(_.status) should ===(Set(MemberStatus.Up))
Cluster(sys2).state.members.size should ===(4)
Cluster(sys2).state.members.map(_.status) should ===(Set(MemberStatus.Up))
}, 10.seconds)
enterBarrier("joined")
within(5.seconds) {
awaitAssert {

View file

@ -1221,6 +1221,14 @@ object MiMa extends AutoPlugin {
// #22881 Make sure connections are aborted correctly on Windows
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.io.ChannelRegistration.cancel"),
// #23231 multi-DC Sharding
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ddata.Replicator.leader"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator.receiveLeaderChanged"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ddata.Replicator.leader_="),
FilterAnyProblemStartingWith("akka.cluster.sharding.ClusterShardingGuardian"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.proxyProps"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.this"),
// #23144 recoverWithRetries cleanup
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.RecoverWith.InfiniteRetries"),