From e0fe0bc49ec63c605b9e5a7cb1eb281fc4923673 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 26 Jun 2017 15:03:33 +0200 Subject: [PATCH] Make cluster sharding DC aware, #23231 * Sharding only within own team (coordinator is singleton) * the ddata Replicator used by Sharding must also be only within own team * added support for Set of roles in ddata Replicator so that can be used by sharding to specify role + team * Sharding proxy can route to sharding in another team --- .../cluster/sharding/ClusterSharding.scala | 106 ++++++++- .../akka/cluster/sharding/ShardRegion.scala | 17 +- .../sharding/ClusterShardingSpec.scala | 1 + .../sharding/TeamClusterShardingSpec.scala | 218 ++++++++++++++++++ .../singleton/ClusterSingletonProxy.scala | 4 +- .../singleton/TeamSingletonManagerSpec.scala | 7 +- .../scala/akka/cluster/ClusterSettings.scala | 1 + .../akka/cluster/ClusterConfigSpec.scala | 4 +- .../akka/cluster/ddata/DistributedData.scala | 2 +- .../scala/akka/cluster/ddata/Replicator.scala | 88 +++++-- .../cluster/ddata/DurablePruningSpec.scala | 8 + project/MiMa.scala | 8 + 12 files changed, 417 insertions(+), 47 deletions(-) create mode 100644 akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/TeamClusterShardingSpec.scala diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 3998ba0bfe..4779a79e48 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -29,6 +29,7 @@ import akka.cluster.ddata.ReplicatorSettings import akka.cluster.ddata.Replicator import scala.util.control.NonFatal import akka.actor.Status +import akka.cluster.ClusterSettings /** * This extension provides sharding functionality of actors in a cluster. @@ -341,16 +342,53 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { typeName: String, role: Option[String], extractEntityId: ShardRegion.ExtractEntityId, + extractShardId: ShardRegion.ExtractShardId): ActorRef = + startProxy(typeName, role, team = None, extractEntityId, extractShardId) + + /** + * Scala API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode, + * i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any + * entity actors itself. The [[ShardRegion]] actor for this type can later be retrieved with the + * [[#shardRegion]] method. + * + * Some settings can be configured as described in the `akka.cluster.sharding` section + * of the `reference.conf`. + * + * @param typeName the name of the entity type + * @param role specifies that this entity type is located on cluster nodes with a specific role. + * If the role is not specified all nodes in the cluster are used. + * @param team The team of the cluster nodes where the cluster sharding is running. + * If None then the same team as current node. + * @param extractEntityId partial function to extract the entity id and the message to send to the + * entity from the incoming message, if the partial function does not match the message will + * be `unhandled`, i.e. posted as `Unhandled` messages on the event stream + * @param extractShardId function to determine the shard id for an incoming message, only messages + * that passed the `extractEntityId` will be used + * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard + */ + def startProxy( + typeName: String, + role: Option[String], + team: Option[String], + extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId): ActorRef = { implicit val timeout = system.settings.CreationTimeout val settings = ClusterShardingSettings(system).withRole(role) - val startMsg = StartProxy(typeName, settings, extractEntityId, extractShardId) + val startMsg = StartProxy(typeName, team, settings, extractEntityId, extractShardId) val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) - regions.put(typeName, shardRegion) + // it must be possible to start several proxies, one per team + regions.put(proxyName(typeName, team), shardRegion) shardRegion } + private def proxyName(typeName: String, team: Option[String]): String = { + team match { + case None ⇒ typeName + case Some(t) ⇒ typeName + "-" + t + } + } + /** * Java/Scala API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode, * i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any @@ -370,9 +408,34 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { def startProxy( typeName: String, role: Optional[String], + messageExtractor: ShardRegion.MessageExtractor): ActorRef = + startProxy(typeName, role, team = Optional.empty(), messageExtractor) + + /** + * Java/Scala API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode, + * i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any + * entity actors itself. The [[ShardRegion]] actor for this type can later be retrieved with the + * [[#shardRegion]] method. + * + * Some settings can be configured as described in the `akka.cluster.sharding` section + * of the `reference.conf`. + * + * @param typeName the name of the entity type + * @param role specifies that this entity type is located on cluster nodes with a specific role. + * If the role is not specified all nodes in the cluster are used. + * @param team The team of the cluster nodes where the cluster sharding is running. + * If None then the same team as current node. + * @param messageExtractor functions to extract the entity id, shard id, and the message to send to the + * entity from the incoming message + * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard + */ + def startProxy( + typeName: String, + role: Optional[String], + team: Optional[String], messageExtractor: ShardRegion.MessageExtractor): ActorRef = { - startProxy(typeName, Option(role.orElse(null)), + startProxy(typeName, Option(role.orElse(null)), Option(team.orElse(null)), extractEntityId = { case msg if messageExtractor.entityId(msg) ne null ⇒ (messageExtractor.entityId(msg), messageExtractor.entityMessage(msg)) @@ -383,14 +446,28 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { /** * Retrieve the actor reference of the [[ShardRegion]] actor responsible for the named entity type. - * The entity type must be registered with the [[#start]] method before it can be used here. - * Messages to the entity is always sent via the `ShardRegion`. + * The entity type must be registered with the [[#start]] or [[#startProxy]] method before it + * can be used here. Messages to the entity is always sent via the `ShardRegion`. */ def shardRegion(typeName: String): ActorRef = regions.get(typeName) match { case null ⇒ throw new IllegalArgumentException(s"Shard type [$typeName] must be started first") case ref ⇒ ref } + /** + * Retrieve the actor reference of the [[ShardRegion]] actor that will act as a proxy to the + * named entity type running in another team. A proxy within the same team can be accessed + * with [[#shardRegion]] instead of this method. The entity type must be registered with the + * [[#startProxy]] method before it can be used here. Messages to the entity is always sent + * via the `ShardRegion`. + */ + def shardRegionProxy(typeName: String, team: String): ActorRef = { + regions.get(proxyName(typeName, Some(team))) match { + case null ⇒ throw new IllegalArgumentException(s"Shard type [$typeName] must be started first") + case ref ⇒ ref + } + } + } /** @@ -402,7 +479,7 @@ private[akka] object ClusterShardingGuardian { extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId, allocationStrategy: ShardAllocationStrategy, handOffStopMessage: Any) extends NoSerializationVerificationNeeded - final case class StartProxy(typeName: String, settings: ClusterShardingSettings, + final case class StartProxy(typeName: String, team: Option[String], settings: ClusterShardingSettings, extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId) extends NoSerializationVerificationNeeded final case class Started(shardRegion: ActorRef) extends NoSerializationVerificationNeeded @@ -441,7 +518,9 @@ private[akka] class ClusterShardingGuardian extends Actor { case Some(r) ⇒ URLEncoder.encode(r, ByteString.UTF_8) + "Replicator" case None ⇒ "replicator" } - val ref = context.actorOf(Replicator.props(replicatorSettings.withRole(settings.role)), name) + // Use members within the team and with the given role (if any) + val replicatorRoles = Set(ClusterSettings.TeamRolePrefix + cluster.settings.Team) ++ settings.role + val ref = context.actorOf(Replicator.props(replicatorSettings.withRoles(replicatorRoles)), name) replicatorByRole = replicatorByRole.updated(settings.role, ref) ref } @@ -505,22 +584,29 @@ private[akka] class ClusterShardingGuardian extends Actor { sender() ! Status.Failure(e) } - case StartProxy(typeName, settings, extractEntityId, extractShardId) ⇒ + case StartProxy(typeName, team, settings, extractEntityId, extractShardId) ⇒ try { + val encName = URLEncoder.encode(typeName, ByteString.UTF_8) val cName = coordinatorSingletonManagerName(encName) val cPath = coordinatorPath(encName) - val shardRegion = context.child(encName).getOrElse { + // it must be possible to start several proxies, one per team + val actorName = team match { + case None ⇒ encName + case Some(t) ⇒ URLEncoder.encode(typeName + "-" + t, ByteString.UTF_8) + } + val shardRegion = context.child(actorName).getOrElse { context.actorOf( ShardRegion.proxyProps( typeName = typeName, + team = team, settings = settings, coordinatorPath = cPath, extractEntityId = extractEntityId, extractShardId = extractShardId, replicator = context.system.deadLetters, majorityMinCap).withDispatcher(context.props.dispatcher), - name = encName) + name = actorName) } sender() ! Started(shardRegion) } catch { diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 3afe6746f7..2d9b8eb0e4 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -20,6 +20,7 @@ import scala.concurrent.Future import scala.reflect.ClassTag import scala.concurrent.Promise import akka.Done +import akka.cluster.ClusterSettings /** * @see [[ClusterSharding$ ClusterSharding extension]] @@ -40,7 +41,7 @@ object ShardRegion { handOffStopMessage: Any, replicator: ActorRef, majorityMinCap: Int): Props = - Props(new ShardRegion(typeName, Some(entityProps), settings, coordinatorPath, extractEntityId, + Props(new ShardRegion(typeName, Some(entityProps), team = None, settings, coordinatorPath, extractEntityId, extractShardId, handOffStopMessage, replicator, majorityMinCap)).withDeploy(Deploy.local) /** @@ -50,13 +51,14 @@ object ShardRegion { */ private[akka] def proxyProps( typeName: String, + team: Option[String], settings: ClusterShardingSettings, coordinatorPath: String, extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId, replicator: ActorRef, majorityMinCap: Int): Props = - Props(new ShardRegion(typeName, None, settings, coordinatorPath, extractEntityId, extractShardId, + Props(new ShardRegion(typeName, None, team, settings, coordinatorPath, extractEntityId, extractShardId, PoisonPill, replicator, majorityMinCap)).withDeploy(Deploy.local) /** @@ -365,6 +367,7 @@ object ShardRegion { private[akka] class ShardRegion( typeName: String, entityProps: Option[Props], + team: Option[String], settings: ClusterShardingSettings, coordinatorPath: String, extractEntityId: ShardRegion.ExtractEntityId, @@ -419,11 +422,15 @@ private[akka] class ShardRegion( retryTask.cancel() } - def matchingRole(member: Member): Boolean = role match { - case None ⇒ true - case Some(r) ⇒ member.hasRole(r) + // when using proxy the team can be different that the own team + private val targetTeamRole = team match { + case Some(t) ⇒ ClusterSettings.TeamRolePrefix + t + case None ⇒ ClusterSettings.TeamRolePrefix + cluster.settings.Team } + def matchingRole(member: Member): Boolean = + member.hasRole(targetTeamRole) && role.forall(member.hasRole) + def coordinatorSelection: Option[ActorSelection] = membersByAge.headOption.map(m ⇒ context.actorSelection(RootActorPath(m.address) + coordinatorPath)) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index 84d550e2f8..77ae4d0c2f 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -460,6 +460,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu val proxy = system.actorOf( ShardRegion.proxyProps( typeName = "counter", + team = None, settings, coordinatorPath = "/user/counterCoordinator/singleton/coordinator", extractEntityId = extractEntityId, diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/TeamClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/TeamClusterShardingSpec.scala new file mode 100644 index 0000000000..9710faccd4 --- /dev/null +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/TeamClusterShardingSpec.scala @@ -0,0 +1,218 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.cluster.sharding + +import scala.concurrent.duration._ + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Address +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.ClusterEvent._ +import akka.cluster.MemberStatus +import akka.cluster.sharding.ShardRegion.CurrentRegions +import akka.cluster.sharding.ShardRegion.GetCurrentRegions +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +object TeamClusterShardingSpec { + sealed trait EntityMsg { + def id: String + } + final case class Ping(id: String) extends EntityMsg + final case class GetCount(id: String) extends EntityMsg + + class Entity extends Actor { + var count = 0 + def receive = { + case Ping(_) ⇒ + count += 1 + sender() ! self + case GetCount(_) ⇒ + sender() ! count + } + } + + val extractEntityId: ShardRegion.ExtractEntityId = { + case m: EntityMsg ⇒ (m.id, m) + } + + val extractShardId: ShardRegion.ExtractShardId = { + case m: EntityMsg ⇒ m.id.charAt(0).toString + } +} + +object TeamClusterShardingSpecConfig extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + commonConfig(ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.actor.provider = "cluster" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-down-unreachable-after = 0s + """)) + + nodeConfig(first, second) { + ConfigFactory.parseString("akka.cluster.team = DC1") + } + + nodeConfig(third, fourth) { + ConfigFactory.parseString("akka.cluster.team = DC2") + } +} + +class TeamClusterShardingMultiJvmNode1 extends TeamClusterShardingSpec +class TeamClusterShardingMultiJvmNode2 extends TeamClusterShardingSpec +class TeamClusterShardingMultiJvmNode3 extends TeamClusterShardingSpec +class TeamClusterShardingMultiJvmNode4 extends TeamClusterShardingSpec + +abstract class TeamClusterShardingSpec extends MultiNodeSpec(TeamClusterShardingSpecConfig) + with STMultiNodeSpec with ImplicitSender { + import TeamClusterShardingSpec._ + import TeamClusterShardingSpecConfig._ + + override def initialParticipants = roles.size + + val cluster = Cluster(system) + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + cluster join node(to).address + startSharding() + within(15.seconds) { + awaitAssert(cluster.state.members.exists { m ⇒ + m.uniqueAddress == cluster.selfUniqueAddress && m.status == MemberStatus.Up + } should be(true)) + } + } + enterBarrier(from.name + "-joined") + } + + def startSharding(): Unit = { + ClusterSharding(system).start( + typeName = "Entity", + entityProps = Props[Entity], + settings = ClusterShardingSettings(system), + extractEntityId = extractEntityId, + extractShardId = extractShardId) + } + + lazy val region = ClusterSharding(system).shardRegion("Entity") + + private def fillAddress(a: Address): Address = + if (a.hasLocalScope) Cluster(system).selfAddress else a + + private def assertCurrentRegions(expected: Set[Address]): Unit = { + awaitAssert({ + val p = TestProbe() + region.tell(GetCurrentRegions, p.ref) + p.expectMsg(CurrentRegions(expected)) + }, 10.seconds) + } + + s"Cluster sharding with teams" must { + "join cluster" in within(20.seconds) { + join(first, first) + join(second, first) + join(third, first) + join(fourth, first) + + awaitAssert({ + Cluster(system).state.members.size should ===(4) + Cluster(system).state.members.map(_.status) should ===(Set(MemberStatus.Up)) + }, 10.seconds) + + runOn(first, second) { + assertCurrentRegions(Set(first, second).map(r ⇒ node(r).address)) + } + runOn(third, fourth) { + assertCurrentRegions(Set(third, fourth).map(r ⇒ node(r).address)) + } + + enterBarrier("after-1") + } + + "initialize shards" in { + runOn(first) { + val locations = (for (n ← 1 to 10) yield { + val id = n.toString + region ! Ping(id) + id → expectMsgType[ActorRef] + }).toMap + val firstAddress = node(first).address + val secondAddress = node(second).address + val hosts = locations.values.map(ref ⇒ fillAddress(ref.path.address)).toSet + hosts should ===(Set(firstAddress, secondAddress)) + } + runOn(third) { + val locations = (for (n ← 1 to 10) yield { + val id = n.toString + region ! Ping(id) + val ref1 = expectMsgType[ActorRef] + region ! Ping(id) + val ref2 = expectMsgType[ActorRef] + ref1 should ===(ref2) + id → ref1 + }).toMap + val thirdAddress = node(third).address + val fourthAddress = node(fourth).address + val hosts = locations.values.map(ref ⇒ fillAddress(ref.path.address)).toSet + hosts should ===(Set(thirdAddress, fourthAddress)) + } + enterBarrier("after-2") + } + + "not mix entities in different teams" in { + runOn(second) { + region ! GetCount("5") + expectMsg(1) + } + runOn(fourth) { + region ! GetCount("5") + expectMsg(2) + } + enterBarrier("after-3") + } + + "allow proxy within same team" in { + runOn(second) { + val proxy = ClusterSharding(system).startProxy( + typeName = "Entity", + role = None, + team = None, // by default use own team + extractEntityId = extractEntityId, + extractShardId = extractShardId) + + proxy ! GetCount("5") + expectMsg(1) + } + enterBarrier("after-4") + } + + "allow proxy across different teams" in { + runOn(second) { + val proxy = ClusterSharding(system).startProxy( + typeName = "Entity", + role = None, + team = Some("DC2"), // proxy to other DC + extractEntityId = extractEntityId, + extractShardId = extractShardId) + + proxy ! GetCount("5") + expectMsg(2) + } + enterBarrier("after-5") + } + + } +} + diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala index a31dc53035..3e51c493f5 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala @@ -176,13 +176,13 @@ final class ClusterSingletonProxy(singletonManagerPath: String, settings: Cluste identifyTimer = None } - private val targetTeam = settings.team match { + private val targetTeamRole = settings.team match { case Some(t) ⇒ ClusterSettings.TeamRolePrefix + t case None ⇒ ClusterSettings.TeamRolePrefix + cluster.settings.Team } def matchingRole(member: Member): Boolean = - member.hasRole(targetTeam) && role.forall(member.hasRole) + member.hasRole(targetTeamRole) && role.forall(member.hasRole) def handleInitial(state: CurrentClusterState): Unit = { trackChange { diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/TeamSingletonManagerSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/TeamSingletonManagerSpec.scala index dd63e8ae70..4cecea2f3c 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/TeamSingletonManagerSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/TeamSingletonManagerSpec.scala @@ -12,6 +12,7 @@ import akka.cluster.Cluster import akka.testkit.ImplicitSender import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } +import akka.cluster.ClusterSettings object TeamSingletonManagerSpec extends MultiNodeConfig { val controller = role("controller") @@ -100,10 +101,10 @@ abstract class TeamSingletonManagerSpec extends MultiNodeSpec(TeamSingletonManag pong.fromTeam should equal(Cluster(system).settings.Team) pong.roles should contain(worker) runOn(controller, first) { - pong.roles should contain("team-one") + pong.roles should contain(ClusterSettings.TeamRolePrefix + "one") } runOn(second, third) { - pong.roles should contain("team-two") + pong.roles should contain(ClusterSettings.TeamRolePrefix + "two") } enterBarrier("after-1") @@ -118,7 +119,7 @@ abstract class TeamSingletonManagerSpec extends MultiNodeSpec(TeamSingletonManag val pong = expectMsgType[TeamSingleton.Pong](10.seconds) pong.fromTeam should ===("one") pong.roles should contain(worker) - pong.roles should contain("team-one") + pong.roles should contain(ClusterSettings.TeamRolePrefix + "one") } enterBarrier("after-1") } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index f6c8fca61d..ddc7b7717d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -34,6 +34,7 @@ object ClusterSettings { } final class ClusterSettings(val config: Config, val systemName: String) { + import ClusterSettings._ import ClusterSettings._ private val cc = config.getConfig("akka.cluster") diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 171e67d42c..3792ec722f 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -46,7 +46,7 @@ class ClusterConfigSpec extends AkkaSpec { MinNrOfMembers should ===(1) MinNrOfMembersOfRole should ===(Map.empty[String, Int]) Team should ===("default") - Roles should ===(Set("team-default")) + Roles should ===(Set(ClusterSettings.TeamRolePrefix + "default")) JmxEnabled should ===(true) UseDispatcher should ===(Dispatchers.DefaultDispatcherId) GossipDifferentViewProbability should ===(0.8 +- 0.0001) @@ -66,7 +66,7 @@ class ClusterConfigSpec extends AkkaSpec { |} """.stripMargin).withFallback(ConfigFactory.load()), system.name) import settings._ - Roles should ===(Set("hamlet", "team-blue")) + Roles should ===(Set("hamlet", ClusterSettings.TeamRolePrefix + "blue")) Team should ===("blue") } } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/DistributedData.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DistributedData.scala index c941f4a97b..9ef8823242 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/DistributedData.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DistributedData.scala @@ -34,7 +34,7 @@ class DistributedData(system: ExtendedActorSystem) extends Extension { * Returns true if this member is not tagged with the role configured for the * replicas. */ - def isTerminated: Boolean = Cluster(system).isTerminated || !settings.role.forall(Cluster(system).selfRoles.contains) + def isTerminated: Boolean = Cluster(system).isTerminated || !settings.roles.subsetOf(Cluster(system).selfRoles) /** * `ActorRef` of the [[Replicator]] . diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index 795a1fce2b..151ef0a5ed 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -48,6 +48,9 @@ import akka.actor.Cancellable import scala.util.control.NonFatal import akka.cluster.ddata.Key.KeyId import akka.annotation.InternalApi +import scala.collection.immutable.TreeSet +import akka.cluster.MemberStatus +import scala.annotation.varargs object ReplicatorSettings { @@ -98,8 +101,8 @@ object ReplicatorSettings { } /** - * @param role Replicas are running on members tagged with this role. - * All members are used if undefined. + * @param roles Replicas are running on members tagged with these roles. + * The member must have all given roles. All members are used if empty. * @param gossipInterval How often the Replicator should send out gossip information. * @param notifySubscribersInterval How often the subscribers will be notified * of changes, if any. @@ -124,7 +127,7 @@ object ReplicatorSettings { * in the `Set`. */ final class ReplicatorSettings( - val role: Option[String], + val roles: Set[String], val gossipInterval: FiniteDuration, val notifySubscribersInterval: FiniteDuration, val maxDeltaElements: Int, @@ -138,10 +141,29 @@ final class ReplicatorSettings( val deltaCrdtEnabled: Boolean, val maxDeltaSize: Int) { + // for backwards compatibility + def this( + role: Option[String], + gossipInterval: FiniteDuration, + notifySubscribersInterval: FiniteDuration, + maxDeltaElements: Int, + dispatcher: String, + pruningInterval: FiniteDuration, + maxPruningDissemination: FiniteDuration, + durableStoreProps: Either[(String, Config), Props], + durableKeys: Set[KeyId], + pruningMarkerTimeToLive: FiniteDuration, + durablePruningMarkerTimeToLive: FiniteDuration, + deltaCrdtEnabled: Boolean, + maxDeltaSize: Int) = + this(role.toSet, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval, + maxPruningDissemination, durableStoreProps, durableKeys, pruningMarkerTimeToLive, durablePruningMarkerTimeToLive, + deltaCrdtEnabled, maxDeltaSize) + // For backwards compatibility def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration, maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration) = - this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval, + this(roles = role.toSet, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval, maxPruningDissemination, Right(Props.empty), Set.empty, 6.hours, 10.days, true, 200) // For backwards compatibility @@ -161,9 +183,20 @@ final class ReplicatorSettings( maxPruningDissemination, durableStoreProps, durableKeys, pruningMarkerTimeToLive, durablePruningMarkerTimeToLive, deltaCrdtEnabled, 200) - def withRole(role: String): ReplicatorSettings = copy(role = ReplicatorSettings.roleOption(role)) + def withRole(role: String): ReplicatorSettings = copy(roles = ReplicatorSettings.roleOption(role).toSet) - def withRole(role: Option[String]): ReplicatorSettings = copy(role = role) + def withRole(role: Option[String]): ReplicatorSettings = copy(roles = role.toSet) + + @varargs + def withRoles(roles: String*): ReplicatorSettings = copy(roles = roles.toSet) + + /** + * INTERNAL API + */ + @InternalApi private[akka] def withRoles(roles: Set[String]): ReplicatorSettings = copy(roles = roles) + + // for backwards compatibility + def role: Option[String] = roles.headOption def withGossipInterval(gossipInterval: FiniteDuration): ReplicatorSettings = copy(gossipInterval = gossipInterval) @@ -216,7 +249,7 @@ final class ReplicatorSettings( copy(maxDeltaSize = maxDeltaSize) private def copy( - role: Option[String] = role, + roles: Set[String] = roles, gossipInterval: FiniteDuration = gossipInterval, notifySubscribersInterval: FiniteDuration = notifySubscribersInterval, maxDeltaElements: Int = maxDeltaElements, @@ -229,7 +262,7 @@ final class ReplicatorSettings( durablePruningMarkerTimeToLive: FiniteDuration = durablePruningMarkerTimeToLive, deltaCrdtEnabled: Boolean = deltaCrdtEnabled, maxDeltaSize: Int = maxDeltaSize): ReplicatorSettings = - new ReplicatorSettings(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, + new ReplicatorSettings(roles, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval, maxPruningDissemination, durableStoreProps, durableKeys, pruningMarkerTimeToLive, durablePruningMarkerTimeToLive, deltaCrdtEnabled, maxDeltaSize) } @@ -988,8 +1021,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog require(!cluster.isTerminated, "Cluster node must not be terminated") require( - role.forall(cluster.selfRoles.contains), - s"This cluster member [${selfAddress}] doesn't have the role [$role]") + roles.subsetOf(cluster.selfRoles), + s"This cluster member [${selfAddress}] doesn't have all the roles [${roles.mkString(", ")}]") //Start periodic gossip to random nodes in cluster import context.dispatcher @@ -1057,8 +1090,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog var weaklyUpNodes: Set[Address] = Set.empty var removedNodes: Map[UniqueAddress, Long] = Map.empty - var leader: Option[Address] = None - def isLeader: Boolean = leader.exists(_ == selfAddress) + // all nodes sorted with the leader first + var leader: TreeSet[Member] = TreeSet.empty(Member.leaderStatusOrdering) + def isLeader: Boolean = + leader.nonEmpty && leader.head.address == selfAddress && leader.head.status == MemberStatus.Up // for pruning timeouts are based on clock that is only increased when all nodes are reachable var previousClockTime = System.nanoTime() @@ -1099,9 +1134,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog override def preStart(): Unit = { if (hasDurableKeys) durableStore ! LoadAll - val leaderChangedClass = if (role.isDefined) classOf[RoleLeaderChanged] else classOf[LeaderChanged] + // not using LeaderChanged/RoleLeaderChanged because here we need one node independent of team cluster.subscribe(self, initialStateMode = InitialStateAsEvents, - classOf[MemberEvent], classOf[ReachabilityEvent], leaderChangedClass) + classOf[MemberEvent], classOf[ReachabilityEvent]) } override def postStop(): Unit = { @@ -1113,7 +1148,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog clockTask.cancel() } - def matchingRole(m: Member): Boolean = role.forall(m.hasRole) + def matchingRole(m: Member): Boolean = roles.subsetOf(m.roles) override val supervisorStrategy = { def fromDurableStore: Boolean = sender() == durableStore && sender() != context.system.deadLetters @@ -1204,11 +1239,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog case MemberWeaklyUp(m) ⇒ receiveWeaklyUpMemberUp(m) case MemberUp(m) ⇒ receiveMemberUp(m) case MemberRemoved(m, _) ⇒ receiveMemberRemoved(m) - case _: MemberEvent ⇒ // not of interest + case evt: MemberEvent ⇒ receiveOtherMemberEvent(evt.member) case UnreachableMember(m) ⇒ receiveUnreachable(m) case ReachableMember(m) ⇒ receiveReachable(m) - case LeaderChanged(leader) ⇒ receiveLeaderChanged(leader, None) - case RoleLeaderChanged(role, leader) ⇒ receiveLeaderChanged(leader, Some(role)) case GetKeyIds ⇒ receiveGetKeyIds() case Delete(key, consistency, req) ⇒ receiveDelete(key, consistency, req) case RemovedNodePruningTick ⇒ receiveRemovedNodePruningTick() @@ -1695,15 +1728,19 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog weaklyUpNodes += m.address def receiveMemberUp(m: Member): Unit = - if (matchingRole(m) && m.address != selfAddress) { - nodes += m.address - weaklyUpNodes -= m.address + if (matchingRole(m)) { + leader += m + if (m.address != selfAddress) { + nodes += m.address + weaklyUpNodes -= m.address + } } def receiveMemberRemoved(m: Member): Unit = { if (m.address == selfAddress) context stop self else if (matchingRole(m)) { + leader -= m nodes -= m.address weaklyUpNodes -= m.address log.debug("adding removed node [{}] from MemberRemoved", m.uniqueAddress) @@ -1713,15 +1750,18 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } } + def receiveOtherMemberEvent(m: Member): Unit = + if (matchingRole(m)) { + // update changed status + leader = (leader - m) + m + } + def receiveUnreachable(m: Member): Unit = if (matchingRole(m)) unreachable += m.address def receiveReachable(m: Member): Unit = if (matchingRole(m)) unreachable -= m.address - def receiveLeaderChanged(leaderOption: Option[Address], roleOption: Option[String]): Unit = - if (roleOption == role) leader = leaderOption - def receiveClockTick(): Unit = { val now = System.nanoTime() if (unreachable.isEmpty) diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala index 553f0d727f..daa222b28a 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala @@ -16,6 +16,7 @@ import com.typesafe.config.ConfigFactory import akka.actor.ActorSystem import akka.actor.ActorRef import scala.concurrent.Await +import akka.cluster.MemberStatus object DurablePruningSpec extends MultiNodeConfig { val first = role("first") @@ -73,6 +74,13 @@ class DurablePruningSpec extends MultiNodeSpec(DurablePruningSpec) with STMultiN val replicator2 = startReplicator(sys2) val probe2 = TestProbe()(sys2) Cluster(sys2).join(node(first).address) + awaitAssert({ + Cluster(system).state.members.size should ===(4) + Cluster(system).state.members.map(_.status) should ===(Set(MemberStatus.Up)) + Cluster(sys2).state.members.size should ===(4) + Cluster(sys2).state.members.map(_.status) should ===(Set(MemberStatus.Up)) + }, 10.seconds) + enterBarrier("joined") within(5.seconds) { awaitAssert { diff --git a/project/MiMa.scala b/project/MiMa.scala index 8eb9783025..5c55c43c01 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -1221,6 +1221,14 @@ object MiMa extends AutoPlugin { // #22881 Make sure connections are aborted correctly on Windows ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.io.ChannelRegistration.cancel"), + // #23231 multi-DC Sharding + ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ddata.Replicator.leader"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator.receiveLeaderChanged"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ddata.Replicator.leader_="), + FilterAnyProblemStartingWith("akka.cluster.sharding.ClusterShardingGuardian"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.proxyProps"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.this"), + // #23144 recoverWithRetries cleanup ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.RecoverWith.InfiniteRetries"),