From 46fcca5f39bc7877dfdb2c3f43bc5f7eec55665a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 15 Oct 2019 12:20:41 +0200 Subject: [PATCH] Add API for multi-dc Sharding/Singleton in Typed, #27705 (#27974) * add withDataCenter in Entity, following same patterna as the role * update cluster-dc.md, split in classic and new pages * fix bug in ClusterSharding shouldHostShard * contains on String * update multi-dc singleton sample --- .../typed/ClusterShardingSettings.scala | 2 +- .../typed/internal/ClusterShardingImpl.scala | 37 ++- .../typed/javadsl/ClusterSharding.scala | 44 +++- .../typed/scaladsl/ClusterSharding.scala | 48 +++- .../typed/MultiDcClusterShardingSpec.scala | 47 +++- .../typed/ShardingCompileOnlyTest.java | 22 ++ .../ClusterSingletonManagerSpec.scala | 2 +- .../typed/MultiDcClusterSingletonSpec.scala | 8 +- ...lusterActors.scala => MultiDcPinger.scala} | 16 +- .../typed/BasicClusterExampleTest.java | 16 ++ .../typed/SingletonCompileOnlyTest.java | 12 + .../typed/BasicClusterExampleSpec.scala | 17 ++ .../typed/SingletonCompileOnlySpec.scala | 8 +- akka-docs/src/main/paradox/cluster-dc.md | 175 +------------- akka-docs/src/main/paradox/index-cluster.md | 1 + .../src/main/paradox/typed/cluster-dc.md | 228 ++++++++++++++++++ akka-docs/src/main/paradox/typed/cluster.md | 2 +- .../src/main/paradox/typed/index-cluster.md | 2 +- 18 files changed, 480 insertions(+), 207 deletions(-) rename akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/{MultiDcClusterActors.scala => MultiDcPinger.scala} (61%) create mode 100644 akka-docs/src/main/paradox/typed/cluster-dc.md diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala index 86d6530b2f..05b1d055d3 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala @@ -283,7 +283,7 @@ final class ClusterShardingSettings( @InternalApi private[akka] def shouldHostShard(cluster: Cluster): Boolean = role.forall(cluster.selfMember.roles.contains) && - dataCenter.forall(cluster.selfMember.dataCenter.contains) + dataCenter.forall(_ == cluster.selfMember.dataCenter) // no withNumberOfShards because it should be defined in configuration to be able to verify same // value on all nodes with `JoinConfigCompatChecker` diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala index de37cede77..965c5b8952 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -11,8 +11,10 @@ import java.util.concurrent.CompletionStage import java.util.concurrent.ConcurrentHashMap import scala.compat.java8.FutureConverters._ + import akka.util.JavaDurationConverters._ import scala.concurrent.Future + import akka.actor.ActorRefProvider import akka.actor.ExtendedActorSystem import akka.actor.InternalActorRef @@ -28,6 +30,7 @@ import akka.actor.typed.internal.adapter.ActorRefAdapter import akka.actor.typed.internal.adapter.ActorSystemAdapter import akka.actor.typed.scaladsl.Behaviors import akka.annotation.InternalApi +import akka.cluster.ClusterSettings.DataCenter import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardRegion @@ -119,12 +122,15 @@ import akka.util.Timeout case Some(e) => e }).asInstanceOf[ShardingMessageExtractor[E, M]] + val settingsWithRole = entity.role.fold(settings)(settings.withRole) + val settingsWithDataCenter = entity.dataCenter.fold(settingsWithRole)(settingsWithRole.withDataCenter) + internalInit( entity.createBehavior, entity.entityProps, entity.typeKey, entity.stopMessage, - entity.role.fold(settings)(settings.withRole), + settingsWithDataCenter, extractor, entity.allocationStrategy) } @@ -142,7 +148,8 @@ import akka.util.Timeout settings = entity.settings.asScala, messageExtractor = entity.messageExtractor.asScala, allocationStrategy = entity.allocationStrategy.asScala, - role = entity.role.asScala)) + role = entity.role.asScala, + dataCenter = entity.dataCenter.asScala)) } private def internalInit[M, E]( @@ -240,6 +247,19 @@ import akka.util.Timeout typeKey.asInstanceOf[EntityTypeKeyImpl[M]]) } + override def entityRefFor[M]( + typeKey: scaladsl.EntityTypeKey[M], + entityId: String, + dataCenter: DataCenter): scaladsl.EntityRef[M] = { + if (dataCenter == cluster.selfMember.dataCenter) + entityRefFor(typeKey, entityId) + else + new EntityRefImpl[M]( + classicSharding.shardRegionProxy(typeKey.name, dataCenter), + entityId, + typeKey.asInstanceOf[EntityTypeKeyImpl[M]]) + } + override def entityRefFor[M](typeKey: javadsl.EntityTypeKey[M], entityId: String): javadsl.EntityRef[M] = { new EntityRefImpl[M]( classicSharding.shardRegion(typeKey.name), @@ -247,6 +267,19 @@ import akka.util.Timeout typeKey.asInstanceOf[EntityTypeKeyImpl[M]]) } + override def entityRefFor[M]( + typeKey: javadsl.EntityTypeKey[M], + entityId: String, + dataCenter: String): javadsl.EntityRef[M] = { + if (dataCenter == cluster.selfMember.dataCenter) + entityRefFor(typeKey, entityId) + else + new EntityRefImpl[M]( + classicSharding.shardRegionProxy(typeKey.name, dataCenter), + entityId, + typeKey.asInstanceOf[EntityTypeKeyImpl[M]]) + } + override def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = { val threshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold val maxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala index d08de2b06f..c2ca415efe 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala @@ -178,7 +178,8 @@ abstract class ClusterSharding { /** * Create an `ActorRef`-like reference to a specific sharded entity. - * Currently you have to correctly specify the type of messages the target can handle. + * + * You have to correctly specify the type of messages the target can handle via the `typeKey`. * * Messages sent through this [[EntityRef]] will be wrapped in a [[ShardingEnvelope]] including the * here provided `entityId`. @@ -187,6 +188,18 @@ abstract class ClusterSharding { */ def entityRefFor[M](typeKey: EntityTypeKey[M], entityId: String): EntityRef[M] + /** + * Create an `ActorRef`-like reference to a specific sharded entity running in another data center. + * + * You have to correctly specify the type of messages the target can handle via the `typeKey`. + * + * Messages sent through this [[EntityRef]] will be wrapped in a [[ShardingEnvelope]] including the + * provided `entityId`. + * + * For in-depth documentation of its semantics, see [[EntityRef]]. + */ + def entityRefFor[M](typeKey: EntityTypeKey[M], entityId: String, dataCenter: String): EntityRef[M] + /** * Actor for querying Cluster Sharding state */ @@ -220,6 +233,7 @@ object Entity { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()) } @@ -236,7 +250,8 @@ final class Entity[M, E] private ( val settings: Optional[ClusterShardingSettings], val messageExtractor: Optional[ShardingMessageExtractor[E, M]], val allocationStrategy: Optional[ShardAllocationStrategy], - val role: Optional[String]) { + val role: Optional[String], + val dataCenter: Optional[String]) { /** * [[akka.actor.typed.Props]] of the entity actors, such as dispatcher settings. @@ -276,7 +291,8 @@ final class Entity[M, E] private ( settings, Optional.ofNullable(newExtractor), allocationStrategy, - role) + role, + dataCenter) /** * Run the Entity actors on nodes with the given role. @@ -284,6 +300,14 @@ final class Entity[M, E] private ( def withRole(role: String): Entity[M, E] = copy(role = Optional.ofNullable(role)) + /** + * The data center of the cluster nodes where the cluster sharding is running. + * If the dataCenter is not specified then the same data center as current node. If the given + * dataCenter does not match the data center of the current node the `ShardRegion` will be started + * in proxy mode. + */ + def withDataCenter(newDataCenter: String): Entity[M, E] = copy(dataCenter = Optional.ofNullable(newDataCenter)) + /** * Allocation strategy which decides on which nodes to allocate new shards, * [[ClusterSharding#defaultShardAllocationStrategy]] is used if this is not specified. @@ -298,8 +322,18 @@ final class Entity[M, E] private ( entityProps: Props = entityProps, settings: Optional[ClusterShardingSettings] = settings, allocationStrategy: Optional[ShardAllocationStrategy] = allocationStrategy, - role: Optional[String] = role): Entity[M, E] = { - new Entity(createBehavior, typeKey, stopMessage, entityProps, settings, messageExtractor, allocationStrategy, role) + role: Optional[String] = role, + dataCenter: Optional[String] = role): Entity[M, E] = { + new Entity( + createBehavior, + typeKey, + stopMessage, + entityProps, + settings, + messageExtractor, + allocationStrategy, + role, + dataCenter) } } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala index 0f0005a313..4a76998e98 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala @@ -20,6 +20,7 @@ import akka.actor.typed.Props import akka.actor.typed.internal.InternalRecipientRef import akka.annotation.DoNotInherit import akka.annotation.InternalApi +import akka.cluster.ClusterSettings.DataCenter import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.typed.internal.ClusterShardingImpl import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl @@ -178,7 +179,8 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding = /** * Create an `ActorRef`-like reference to a specific sharded entity. - * Currently you have to correctly specify the type of messages the target can handle. + * + * You have to correctly specify the type of messages the target can handle via the `typeKey`. * * Messages sent through this [[EntityRef]] will be wrapped in a [[ShardingEnvelope]] including the * here provided `entityId`. @@ -187,6 +189,18 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding = */ def entityRefFor[M](typeKey: EntityTypeKey[M], entityId: String): EntityRef[M] + /** + * Create an `ActorRef`-like reference to a specific sharded entity running in another data center. + * + * You have to correctly specify the type of messages the target can handle via the `typeKey`. + * + * Messages sent through this [[EntityRef]] will be wrapped in a [[ShardingEnvelope]] including the + * here provided `entityId`. + * + * For in-depth documentation of its semantics, see [[EntityRef]]. + */ + def entityRefFor[M](typeKey: EntityTypeKey[M], entityId: String, dataCenter: DataCenter): EntityRef[M] + /** * Actor for querying Cluster Sharding state */ @@ -217,7 +231,7 @@ object Entity { */ def apply[M](typeKey: EntityTypeKey[M])( createBehavior: EntityContext[M] => Behavior[M]): Entity[M, ShardingEnvelope[M]] = - new Entity(createBehavior, typeKey, None, Props.empty, None, None, None, None) + new Entity(createBehavior, typeKey, None, Props.empty, None, None, None, None, None) } /** @@ -231,7 +245,8 @@ final class Entity[M, E] private[akka] ( val settings: Option[ClusterShardingSettings], val messageExtractor: Option[ShardingMessageExtractor[E, M]], val allocationStrategy: Option[ShardAllocationStrategy], - val role: Option[String]) { + val role: Option[String], + val dataCenter: Option[DataCenter]) { /** * [[akka.actor.typed.Props]] of the entity actors, such as dispatcher settings. @@ -271,7 +286,8 @@ final class Entity[M, E] private[akka] ( settings, Option(newExtractor), allocationStrategy, - role) + role, + dataCenter) /** * Allocation strategy which decides on which nodes to allocate new shards, @@ -283,7 +299,15 @@ final class Entity[M, E] private[akka] ( /** * Run the Entity actors on nodes with the given role. */ - def withRole(role: String): Entity[M, E] = copy(role = Some(role)) + def withRole(newRole: String): Entity[M, E] = copy(role = Some(newRole)) + + /** + * The data center of the cluster nodes where the cluster sharding is running. + * If the dataCenter is not specified then the same data center as current node. If the given + * dataCenter does not match the data center of the current node the `ShardRegion` will be started + * in proxy mode. + */ + def withDataCenter(newDataCenter: DataCenter): Entity[M, E] = copy(dataCenter = Some(newDataCenter)) private def copy( createBehavior: EntityContext[M] => Behavior[M] = createBehavior, @@ -292,8 +316,18 @@ final class Entity[M, E] private[akka] ( entityProps: Props = entityProps, settings: Option[ClusterShardingSettings] = settings, allocationStrategy: Option[ShardAllocationStrategy] = allocationStrategy, - role: Option[String] = role): Entity[M, E] = { - new Entity(createBehavior, typeKey, stopMessage, entityProps, settings, messageExtractor, allocationStrategy, role) + role: Option[String] = role, + dataCenter: Option[DataCenter] = dataCenter): Entity[M, E] = { + new Entity( + createBehavior, + typeKey, + stopMessage, + entityProps, + settings, + messageExtractor, + allocationStrategy, + role, + dataCenter) } } diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala index 676d2c2636..dd56654716 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala @@ -9,7 +9,7 @@ import akka.actor.typed.ActorRef import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.cluster.sharding.typed.scaladsl.Entity -import akka.cluster.typed.{ MultiDcClusterActors, MultiNodeTypedClusterSpec } +import akka.cluster.typed.{ MultiDcPinger, MultiNodeTypedClusterSpec } import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec } import akka.actor.testkit.typed.scaladsl.TestProbe import akka.cluster.MultiNodeClusterSpec @@ -54,9 +54,9 @@ abstract class MultiDcClusterShardingSpec with ScalaFutures { import MultiDcClusterShardingSpecConfig._ - import MultiDcClusterActors._ + import MultiDcPinger._ - val typeKey = EntityTypeKey[PingProtocol]("ping") + val typeKey = EntityTypeKey[Command]("ping") val entityId = "ping-1" "Cluster sharding in multi dc cluster" must { @@ -66,7 +66,7 @@ abstract class MultiDcClusterShardingSpec "init sharding" in { val sharding = ClusterSharding(typedSystem) - val shardRegion: ActorRef[ShardingEnvelope[PingProtocol]] = sharding.init(Entity(typeKey)(_ => multiDcPinger)) + val shardRegion: ActorRef[ShardingEnvelope[Command]] = sharding.init(Entity(typeKey)(_ => MultiDcPinger())) val probe = TestProbe[Pong] shardRegion ! ShardingEnvelope(entityId, Ping(probe.ref)) probe.expectMessage(max = 15.seconds, Pong(cluster.selfMember.dataCenter)) @@ -90,14 +90,45 @@ abstract class MultiDcClusterShardingSpec enterBarrier("ask") } - "be able to message cross dc via proxy" in { + "be able to message cross dc via proxy, defined with ClusterShardingSettings" in { runOn(first, second) { - val proxy: ActorRef[ShardingEnvelope[PingProtocol]] = ClusterSharding(typedSystem).init( - Entity(typeKey)(_ => multiDcPinger).withSettings(ClusterShardingSettings(typedSystem).withDataCenter("dc2"))) + val proxy: ActorRef[ShardingEnvelope[Command]] = ClusterSharding(typedSystem).init( + Entity(typeKey)(_ => MultiDcPinger()).withSettings(ClusterShardingSettings(typedSystem).withDataCenter("dc2"))) val probe = TestProbe[Pong] proxy ! ShardingEnvelope(entityId, Ping(probe.ref)) probe.expectMessage(remainingOrDefault, Pong("dc2")) } - enterBarrier("done") + enterBarrier("cross-dc-1") + } + + "be able to message cross dc via proxy, defined with Entity" in { + runOn(first, second) { + val system = typedSystem + //#proxy-dc + val proxy: ActorRef[ShardingEnvelope[Command]] = + ClusterSharding(system).init(Entity(typeKey)(_ => MultiDcPinger()).withDataCenter("dc2")) + //#proxy-dc + val probe = TestProbe[Pong] + proxy ! ShardingEnvelope(entityId, Ping(probe.ref)) + probe.expectMessage(remainingOrDefault, Pong("dc2")) + } + enterBarrier("cross-dc-2") + } + + "be able to message cross dc via proxy, defined with EntityRef" in { + runOn(first, second) { + val system = typedSystem + //#proxy-dc-entityref + // it must still be started before usage + ClusterSharding(system).init(Entity(typeKey)(_ => MultiDcPinger()).withDataCenter("dc2")) + + val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId, "dc2") + //#proxy-dc-entityref + + val probe = TestProbe[Pong] + entityRef ! Ping(probe.ref) + probe.expectMessage(remainingOrDefault, Pong("dc2")) + } + enterBarrier("cross-dc-3") } } diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java index 18decd7859..cee5d8b356 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java @@ -226,4 +226,26 @@ interface ShardingCompileOnlyTest { entityContext.getEntityTypeKey().name(), entityContext.getEntityId())))); // #persistence } + + public static void dataCenterExample() { + ActorSystem system = ActorSystem.create(Behaviors.empty(), "ShardingExample"); + EntityTypeKey typeKey = EntityTypeKey.create(Counter.Command.class, "Counter"); + String entityId = "a"; + + // #proxy-dc + ActorRef> proxy = + ClusterSharding.get(system) + .init( + Entity.of(typeKey, ctx -> Counter.create(ctx.getEntityId())).withDataCenter("dc2")); + // #proxy-dc + + // #proxy-dc-entityref + // it must still be started before usage + ClusterSharding.get(system) + .init(Entity.of(typeKey, ctx -> Counter.create(ctx.getEntityId())).withDataCenter("dc2")); + + EntityRef entityRef = + ClusterSharding.get(system).entityRefFor(typeKey, entityId, "dc2"); + // #proxy-dc-entityref + } } diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala index b2bbf3cf2d..0618013ee6 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala @@ -242,7 +242,7 @@ class ClusterSingletonManagerSpec val proxyDcB = system.actorOf( ClusterSingletonProxy.props( singletonManagerPath = "/user/consumer", - settings = ClusterSingletonProxySettings(system).withRole("worker").withDataCenter("B")), + settings = ClusterSingletonProxySettings(system).withDataCenter("B")), name = "consumerProxyDcB") //#create-singleton-proxy-dc proxyDcB diff --git a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiDcClusterSingletonSpec.scala b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiDcClusterSingletonSpec.scala index 2d33248dd5..f8f5ac6264 100644 --- a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiDcClusterSingletonSpec.scala +++ b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiDcClusterSingletonSpec.scala @@ -41,7 +41,7 @@ abstract class MultiDcClusterSingletonSpec extends MultiNodeSpec(MultiDcClusterSingletonSpecConfig) with MultiNodeTypedClusterSpec { - import MultiDcClusterActors._ + import MultiDcPinger._ import MultiDcClusterSingletonSpecConfig._ "A cluster with multiple data centers" must { @@ -64,7 +64,7 @@ abstract class MultiDcClusterSingletonSpec "be able to create and ping singleton in same DC" in { runOn(first) { val singleton = ClusterSingleton(typedSystem) - val pinger = singleton.init(SingletonActor(multiDcPinger, "ping").withStopMessage(NoMore)) + val pinger = singleton.init(SingletonActor(MultiDcPinger(), "ping").withStopMessage(NoMore)) val probe = TestProbe[Pong] pinger ! Ping(probe.ref) probe.expectMessage(Pong("dc1")) @@ -79,7 +79,7 @@ abstract class MultiDcClusterSingletonSpec runOn(second) { val singleton = ClusterSingleton(system.toTyped) val pinger = singleton.init( - SingletonActor(multiDcPinger, "ping") + SingletonActor(MultiDcPinger(), "ping") .withStopMessage(NoMore) .withSettings(ClusterSingletonSettings(typedSystem).withDataCenter("dc1"))) val probe = TestProbe[Pong] @@ -93,7 +93,7 @@ abstract class MultiDcClusterSingletonSpec "be able to target singleton with the same name in own dc " in { runOn(second, third) { val singleton = ClusterSingleton(typedSystem) - val pinger = singleton.init(SingletonActor(multiDcPinger, "ping").withStopMessage(NoMore)) + val pinger = singleton.init(SingletonActor(MultiDcPinger(), "ping").withStopMessage(NoMore)) val probe = TestProbe[Pong] pinger ! Ping(probe.ref) probe.expectMessage(Pong("dc2")) diff --git a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiDcClusterActors.scala b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiDcPinger.scala similarity index 61% rename from akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiDcClusterActors.scala rename to akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiDcPinger.scala index ab0c2eaeac..af84a5f28d 100644 --- a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiDcClusterActors.scala +++ b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiDcPinger.scala @@ -5,18 +5,20 @@ package akka.cluster.typed import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors import akka.serialization.jackson.CborSerializable -object MultiDcClusterActors { - case class Pong(dc: String) extends CborSerializable - sealed trait PingProtocol extends CborSerializable - case class Ping(ref: ActorRef[Pong]) extends PingProtocol - case object NoMore extends PingProtocol +object MultiDcPinger { - val multiDcPinger = Behaviors.setup[PingProtocol] { ctx => + sealed trait Command extends CborSerializable + case class Ping(ref: ActorRef[Pong]) extends Command + case object NoMore extends Command + case class Pong(dc: String) extends CborSerializable + + def apply(): Behavior[Command] = Behaviors.setup[Command] { ctx => val cluster = Cluster(ctx.system) - Behaviors.receiveMessage[PingProtocol] { + Behaviors.receiveMessage[Command] { case Ping(ref) => ref ! Pong(cluster.selfMember.dataCenter) Behaviors.same diff --git a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/BasicClusterExampleTest.java b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/BasicClusterExampleTest.java index b284e0179b..7c4d63e0a7 100644 --- a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/BasicClusterExampleTest.java +++ b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/BasicClusterExampleTest.java @@ -24,6 +24,7 @@ import com.typesafe.config.ConfigFactory; import java.util.ArrayList; import java.util.List; +import java.util.Set; // FIXME use awaitAssert to await cluster forming like in BasicClusterExampleSpec public class BasicClusterExampleTest { // extends JUnitSuite { @@ -146,4 +147,19 @@ public class BasicClusterExampleTest { // extends JUnitSuite { } // #hasRole } + + void illustrateDcAccess() { + ActorSystem system = null; + + // #dcAccess + final Cluster cluster = Cluster.get(system); + // this node's data center + String dc = cluster.selfMember().dataCenter(); + // all known data centers + Set allDc = cluster.state().getAllDataCenters(); + // a specific member's data center + Member aMember = cluster.state().getMembers().iterator().next(); + String aDc = aMember.dataCenter(); + // #dcAccess + } } diff --git a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/SingletonCompileOnlyTest.java b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/SingletonCompileOnlyTest.java index 1eea0a6cf9..55a7513565 100644 --- a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/SingletonCompileOnlyTest.java +++ b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/SingletonCompileOnlyTest.java @@ -14,6 +14,7 @@ import java.time.Duration; // #import import akka.cluster.typed.ClusterSingleton; +import akka.cluster.typed.ClusterSingletonSettings; import akka.cluster.typed.SingletonActor; // #import @@ -117,4 +118,15 @@ public interface SingletonCompileOnlyTest { // #backoff proxy.tell(Counter.Increment.INSTANCE); // avoid unused warning } + + public static void dcProxy() { + // #create-singleton-proxy-dc + ActorRef singletonProxy = + ClusterSingleton.get(system) + .init( + SingletonActor.of(Counter.create(), "GlobalCounter") + .withSettings(ClusterSingletonSettings.create(system).withDataCenter("B"))); + // #create-singleton-proxy-dc + + } } diff --git a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala index b8ba73c017..4aaf6a8a75 100644 --- a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala +++ b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala @@ -6,6 +6,7 @@ package docs.akka.cluster.typed import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.testkit.SocketUtil +import com.github.ghik.silencer.silent import com.typesafe.config.ConfigFactory import org.scalatest.{ Matchers, WordSpec } //#cluster-imports @@ -83,6 +84,22 @@ akka { } //#hasRole } + + @silent("never used") + def illustrateDcAccess(): Unit = { + val system: ActorSystem[_] = ??? + + //#dcAccess + val cluster = Cluster(system) + // this node's data center + val dc = cluster.selfMember.dataCenter + // all known data centers + val allDc = cluster.state.allDataCenters + // a specific member's data center + val aMember = cluster.state.members.head + val aDc = aMember.dataCenter + //#dcAccess + } } class BasicClusterConfigSpec extends WordSpec with ScalaFutures with Eventually with Matchers with LogCapturing { diff --git a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/SingletonCompileOnlySpec.scala b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/SingletonCompileOnlySpec.scala index 9a5db19390..1e45c3a66d 100644 --- a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/SingletonCompileOnlySpec.scala +++ b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/SingletonCompileOnlySpec.scala @@ -6,9 +6,10 @@ package docs.akka.cluster.typed import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, SupervisorStrategy } import akka.actor.typed.scaladsl.Behaviors - import scala.concurrent.duration._ +import akka.cluster.typed.ClusterSingletonSettings + object SingletonCompileOnlySpec { val system = ActorSystem(Behaviors.empty, "Singleton") @@ -64,4 +65,9 @@ object SingletonCompileOnlySpec { .onFailure[Exception](SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 0.2)), "GlobalCounter")) //#backoff + + //#create-singleton-proxy-dc + val singletonProxy: ActorRef[Counter.Command] = ClusterSingleton(system).init( + SingletonActor(Counter(), "GlobalCounter").withSettings(ClusterSingletonSettings(system).withDataCenter("dc2"))) + //#create-singleton-proxy-dc } diff --git a/akka-docs/src/main/paradox/cluster-dc.md b/akka-docs/src/main/paradox/cluster-dc.md index 8d464a3b19..9abe58bb95 100644 --- a/akka-docs/src/main/paradox/cluster-dc.md +++ b/akka-docs/src/main/paradox/cluster-dc.md @@ -1,100 +1,12 @@ -# Cluster across multiple data centers +# Classic Multi-DC Cluster This chapter describes how @ref[Akka Cluster](cluster-usage.md) can be used across multiple data centers, availability zones or regions. -The reason for making the Akka Cluster aware of data center boundaries is that -communication across data centers typically has much higher latency and higher failure -rate than communication between nodes in the same data center. - -However, the grouping of nodes is not limited to the physical boundaries of data centers, -even though that is the primary use case. It could also be used as a logical grouping -for other reasons, such as isolation of certain nodes to improve stability or splitting -up a large cluster into smaller groups of nodes for better scalability. - -## Motivation - -There can be many reasons for using more than one data center, such as: - -* Redundancy to tolerate failures in one location and still be operational. -* Serve requests from a location near the user to provide better responsiveness. -* Balance the load over many servers. - -It's possible to run an ordinary Akka Cluster with default settings that spans multiple -data centers but that may result in problems like: - -* Management of Cluster membership is stalled during network partitions as described in a - separate section below. This means that nodes would not be able to be added and removed - during network partitions between data centers. -* More frequent false positive failure detection for network connections across data centers. - It's not possible to have different settings for the failure detection within vs. across - data centers. -* Downing/removal of nodes in the case of network partitions should typically be treated - differently for failures within vs. across data centers. For network partitions between - data centers the system should typically not down the unreachable nodes, but instead wait until it heals or - a decision is made by a human or external monitoring system. For failures within same - data center automatic, more aggressive, downing mechanisms can be employed for quick fail over. -* Quick fail over of Cluster Singleton and Cluster Sharding from one data center to another - is difficult to do in a safe way. There is a risk that singletons or sharded entities become - active on both sides of a network partition. -* Lack of location information makes it difficult to optimize communication to prefer nodes - that are close over distant nodes. E.g. a cluster aware router would be more efficient - if it would prefer routing messages to nodes in the own data center. - -To avoid some of these problems one can run a separate Akka Cluster per data center and use another -communication channel between the data centers, such as HTTP, an external message broker or -@ref[Cluster Client](cluster-singleton.md). However, many of the nice tools that are built on -top of the Cluster membership information are lost. For example, it wouldn't be possible -to use @ref[Distributed Data](distributed-data.md) across the separate clusters. - -We often recommend implementing a micro-service as one Akka Cluster. The external API of the -service would be HTTP, gRPC or a message broker, and not Akka Remoting or Cluster (see additional discussion - in the Lagom Framework docs: -@scala[[Internal and External Communication](https://www.lagomframework.com/documentation/current/scala/InternalAndExternalCommunication.html)] -@java[[Internal and External Communication](https://www.lagomframework.com/documentation/current/java/InternalAndExternalCommunication.html)]), -but the internal communication within the service that is running on several nodes would use ordinary actor -messaging or the tools based on Akka Cluster. When deploying this service to multiple data -centers it would be inconvenient if the internal communication could not use ordinary actor -messaging because it was separated into several Akka Clusters. The benefit of using Akka -messaging internally is performance as well as ease of development and reasoning about -your domain in terms of Actors. - -Therefore, it's possible to make the Akka Cluster aware of data centers so that one Akka -Cluster can span multiple data centers and still be tolerant to network partitions. - -## Defining the data centers - -The features are based on the idea that nodes can be assigned to a group of nodes -by setting the `akka.cluster.multi-data-center.self-data-center` configuration property. -A node can only belong to one data center and if nothing is specified a node will belong -to the `default` data center. - -The grouping of nodes is not limited to the physical boundaries of data centers, -even though that is the primary use case. It could also be used as a logical grouping -for other reasons, such as isolation of certain nodes to improve stability or splitting -up a large cluster into smaller groups of nodes for better scalability. +For the full documentation of this feature and for new projects see @ref:[Multi-DC Cluster](typed/cluster-dc.md). ## Membership -Some @ref[membership transitions](typed/cluster-membership.md#membership-lifecycle) are managed by -one node called the @ref[leader](typed/cluster-concepts.md#leader). There is one leader per data center -and it is responsible for these transitions for the members within the same data center. Members of -other data centers are managed independently by the leader of the respective data center. These actions -cannot be performed while there are any unreachability observations among the nodes in the data center, -but unreachability across different data centers don't influence the progress of membership management -within a data center. Nodes can be added and removed also when there are network partitions between -data centers, which is impossible if nodes are not grouped into data centers. - -![cluster-dc.png](./images/cluster-dc.png) - -User actions like joining, leaving, and downing can be sent to any node in the cluster, -not only to the nodes in the data center of the node. Seed nodes are also global. - -The data center membership is implemented by adding the data center name prefixed with `"dc-"` to the -@ref[roles](typed/cluster.md#node-roles) of the member and thereby this information is known -by all other members in the cluster. This is an implementation detail, but it can be good to know -if you see this in log messages. - You can retrieve information about what data center a member belongs to: Scala @@ -103,58 +15,10 @@ Scala Java : @@snip [ClusterDocTest.java](/akka-docs/src/test/java/jdocs/cluster/ClusterDocTest.java) { #dcAccess } -## Failure Detection - -@ref[Failure detection](typed/cluster-concepts.md#failure-detector) is performed by sending heartbeat messages -to detect if a node is unreachable. This is done more frequently and with more certainty among -the nodes in the same data center than across data centers. The failure detection across different data centers should -be interpreted as an indication of problem with the network link between the data centers. - -Two different failure detectors can be configured for these two purposes: - -* `akka.cluster.failure-detector` for failure detection within own data center -* `akka.cluster.multi-data-center.failure-detector` for failure detection across different data centers - -When @ref[subscribing to cluster events](cluster-usage.md#cluster-subscriber) the `UnreachableMember` and -`ReachableMember` events are for observations within the own data center. The same data center as where the -subscription was registered. - -For cross data center unreachability notifications you can subscribe to `UnreachableDataCenter` and `ReachableDataCenter` -events. - -Heartbeat messages for failure detection across data centers are only performed between a number of the -oldest nodes on each side. The number of nodes is configured with `akka.cluster.multi-data-center.cross-data-center-connections`. -The reason for only using a limited number of nodes is to keep the number of connections across data -centers low. The same nodes are also used for the gossip protocol when disseminating the membership -information across data centers. Within a data center all nodes are involved in gossip and failure detection. - -This influences how rolling upgrades should be performed. Don't stop all of the oldest that are used for gossip -at the same time. Stop one or a few at a time so that new nodes can take over the responsibility. -It's best to leave the oldest nodes until last. - -See the @ref:[failure detector](typed/cluster.md#failure-detector) for more details. +For the full documentation of this feature and for new projects see @ref:[Multi-DC Cluster](typed/cluster-dc.md#membership). ## Cluster Singleton -The @ref[Cluster Singleton](cluster-singleton.md) is a singleton per data center. If you start the -`ClusterSingletonManager` on all nodes and you have defined 3 different data centers there will be -3 active singleton instances in the cluster, one in each data center. This is taken care of automatically, -but is important to be aware of. Designing the system for one singleton per data center makes it possible -for the system to be available also during network partitions between data centers. - -The reason why the singleton is per data center and not global is that membership information is not -guaranteed to be consistent across data centers when using one leader per data center and that makes it -difficult to select a single global singleton. - -If you need a global singleton you have to pick one data center to host that singleton and only start the -`ClusterSingletonManager` on nodes of that data center. If the data center is unreachable from another data center the -singleton is inaccessible, which is a reasonable trade-off when selecting consistency over availability. - -The `ClusterSingletonProxy` is by default routing messages to the singleton in the own data center, but -it can be started with a `data-center` parameter in the `ClusterSingletonProxySettings` to define that -it should route messages to a singleton located in another data center. That is useful for example when -having a global singleton in one data center and accessing it from other data centers. - This is how to create a singleton proxy for a specific data center: Scala @@ -166,32 +30,10 @@ Java If using the own data center as the `withDataCenter` parameter that would be a proxy for the singleton in the own data center, which is also the default if `withDataCenter` is not given. +For the full documentation of this feature and for new projects see @ref:[Multi-DC Cluster](typed/cluster-dc.md#cluster-singleton). + ## Cluster Sharding -The coordinator in @ref[Cluster Sharding](cluster-sharding.md) is a Cluster Singleton and therefore, -as explained above, Cluster Sharding is also per data center. Each data center will have its own coordinator -and regions, isolated from other data centers. If you start an entity type with the same name on all -nodes and you have defined 3 different data centers and then send messages to the same entity id to -sharding regions in all data centers you will end up with 3 active entity instances for that entity id, -one in each data center. This is because the region/coordinator is only aware of its own data center -and will activate the entity there. It's unaware of the existence of corresponding entities in the -other data centers. - -Especially when used together with Akka Persistence that is based on the single-writer principle -it is important to avoid running the same entity at multiple locations at the same time with a -shared data store. That would result in corrupt data since the events stored by different instances -may be interleaved and would be interpreted differently in a later replay. For active active persistent -entities see Lightbend's [Multi-DC Persistence](https://doc.akka.io/docs/akka-enhancements/current/persistence-dc/index.html) - -If you need global entities you have to pick one data center to host that entity type and only start -`ClusterSharding` on nodes of that data center. If the data center is unreachable from another data center the -entities are inaccessible, which is a reasonable trade-off when selecting consistency over availability. - -The Cluster Sharding proxy is by default routing messages to the shard regions in their own data center, but -it can be started with a `data-center` parameter to define that it should route messages to a shard region -located in another data center. That is useful for example when having global entities in one data center and -accessing them from other data centers. - This is how to create a sharding proxy for a specific data center: Scala @@ -200,9 +42,4 @@ Scala Java : @@snip [ClusterShardingTest.java](/akka-docs/src/test/java/jdocs/sharding/ClusterShardingTest.java) { #proxy-dc } -Another way to manage global entities is to make sure that certain entity ids are located in -only one data center by routing the messages to the right region. For example, the routing function -could be that odd entity ids are routed to data center A and even entity ids to data center B. -Before sending the message to the local region actor you make the decision of which data center it should -be routed to. Messages for another data center can be sent with a sharding proxy as explained above and -messages for the own data center are sent to the local region. +For the full documentation of this feature and for new projects see @ref:[Multi-DC Cluster](typed/cluster-dc.md#cluster-sharding). diff --git a/akka-docs/src/main/paradox/index-cluster.md b/akka-docs/src/main/paradox/index-cluster.md index 46b1b2ef0d..bdfc6a101a 100644 --- a/akka-docs/src/main/paradox/index-cluster.md +++ b/akka-docs/src/main/paradox/index-cluster.md @@ -15,6 +15,7 @@ For the new API see @ref[Cluster](typed/index-cluster.md). * [cluster-sharding](cluster-sharding.md) * [cluster-metrics](cluster-metrics.md) * [distributed-data](distributed-data.md) +* [cluster-dc](cluster-dc.md) * [serialization](serialization-classic.md) @@@ diff --git a/akka-docs/src/main/paradox/typed/cluster-dc.md b/akka-docs/src/main/paradox/typed/cluster-dc.md new file mode 100644 index 0000000000..ecba52e358 --- /dev/null +++ b/akka-docs/src/main/paradox/typed/cluster-dc.md @@ -0,0 +1,228 @@ +# Multi-DC Cluster + +@@@ note +For the Akka Classic documentation of this feature see @ref:[Classic Multi-DC Cluster](../cluster-dc.md) +@@@ + +This chapter describes how @ref[Akka Cluster](cluster.md) can be used across +multiple data centers, availability zones or regions. + +The reason for making the Akka Cluster aware of data center boundaries is that +communication across data centers typically has much higher latency and higher failure +rate than communication between nodes in the same data center. + +However, the grouping of nodes is not limited to the physical boundaries of data centers, +even though that is the primary use case. It could also be used as a logical grouping +for other reasons, such as isolation of certain nodes to improve stability or splitting +up a large cluster into smaller groups of nodes for better scalability. + +## Dependency + +To use Akka Cluster add the following dependency in your project: + +@@dependency[sbt,Maven,Gradle] { + group=com.typesafe.akka + artifact=akka-cluster-typed_$scala.binary_version$ + version=$akka.version$ +} + +## Motivation + +There can be many reasons for using more than one data center, such as: + +* Redundancy to tolerate failures in one location and still be operational. +* Serve requests from a location near the user to provide better responsiveness. +* Balance the load over many servers. + +It's possible to run an ordinary Akka Cluster with default settings that spans multiple +data centers but that may result in problems like: + +* Management of Cluster membership is stalled during network partitions as described in a + separate section below. This means that nodes would not be able to be added and removed + during network partitions between data centers. +* More frequent false positive failure detection for network connections across data centers. + It's not possible to have different settings for the failure detection within vs. across + data centers. +* Downing/removal of nodes in the case of network partitions should typically be treated + differently for failures within vs. across data centers. For network partitions between + data centers the system should typically not down the unreachable nodes, but instead wait until it heals or + a decision is made by a human or external monitoring system. For failures within same + data center automatic, more aggressive, downing mechanisms can be employed for quick fail over. +* Quick fail over of Cluster Singleton and Cluster Sharding from one data center to another + is difficult to do in a safe way. There is a risk that singletons or sharded entities become + active on both sides of a network partition. +* Lack of location information makes it difficult to optimize communication to prefer nodes + that are close over distant nodes. E.g. a cluster aware router would be more efficient + if it would prefer routing messages to nodes in the own data center. + +To avoid some of these problems one can run a separate Akka Cluster per data center and use another +communication channel between the data centers, such as HTTP, an external message broker. +However, many of the nice tools that are built on top of the Cluster membership information are lost. +For example, it wouldn't be possible to use @ref[Distributed Data](distributed-data.md) across the separate clusters. + +We often recommend implementing a micro-service as one Akka Cluster. The external API of the +service would be HTTP, gRPC or a message broker, and not Akka Remoting or Cluster (see additional discussion +in @ref:[When and where to use Akka Cluster](choosing-cluster.md)). + +The internal communication within the service that is running on several nodes would use ordinary actor +messaging or the tools based on Akka Cluster. When deploying this service to multiple data +centers it would be inconvenient if the internal communication could not use ordinary actor +messaging because it was separated into several Akka Clusters. The benefit of using Akka +messaging internally is performance as well as ease of development and reasoning about +your domain in terms of Actors. + +Therefore, it's possible to make the Akka Cluster aware of data centers so that one Akka +Cluster can span multiple data centers and still be tolerant to network partitions. + +## Defining the data centers + +The features are based on the idea that nodes can be assigned to a group of nodes +by setting the `akka.cluster.multi-data-center.self-data-center` configuration property. +A node can only belong to one data center and if nothing is specified a node will belong +to the `default` data center. + +The grouping of nodes is not limited to the physical boundaries of data centers, +even though that is the primary use case. It could also be used as a logical grouping +for other reasons, such as isolation of certain nodes to improve stability or splitting +up a large cluster into smaller groups of nodes for better scalability. + +## Membership + +Some @ref[membership transitions](cluster-membership.md#membership-lifecycle) are managed by +one node called the @ref[leader](cluster-concepts.md#leader). There is one leader per data center +and it is responsible for these transitions for the members within the same data center. Members of +other data centers are managed independently by the leader of the respective data center. These actions +cannot be performed while there are any unreachability observations among the nodes in the data center, +but unreachability across different data centers don't influence the progress of membership management +within a data center. Nodes can be added and removed also when there are network partitions between +data centers, which is impossible if nodes are not grouped into data centers. + +![cluster-dc.png](../images/cluster-dc.png) + +User actions like joining, leaving, and downing can be sent to any node in the cluster, +not only to the nodes in the data center of the node. Seed nodes are also global. + +The data center membership is implemented by adding the data center name prefixed with `"dc-"` to the +@ref[roles](cluster.md#node-roles) of the member and thereby this information is known +by all other members in the cluster. This is an implementation detail, but it can be good to know +if you see this in log messages. + +You can retrieve information about what data center a member belongs to: + +Scala +: @@snip [BasicClusterExampleSpec.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala) { #dcAccess } + +Java +: @@snip [BasicClusterExampleTest.java](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/BasicClusterExampleTest.java) { #dcAccess } + +## Failure Detection + +@ref[Failure detection](cluster-concepts.md#failure-detector) is performed by sending heartbeat messages +to detect if a node is unreachable. This is done more frequently and with more certainty among +the nodes in the same data center than across data centers. The failure detection across different data centers should +be interpreted as an indication of problem with the network link between the data centers. + +Two different failure detectors can be configured for these two purposes: + +* `akka.cluster.failure-detector` for failure detection within own data center +* `akka.cluster.multi-data-center.failure-detector` for failure detection across different data centers + +When @ref[subscribing to cluster events](cluster.md#cluster-subscriptions) the `UnreachableMember` and +`ReachableMember` events are for observations within the own data center. The same data center as where the +subscription was registered. + +For cross data center unreachability notifications you can subscribe to `UnreachableDataCenter` and `ReachableDataCenter` +events. + +Heartbeat messages for failure detection across data centers are only performed between a number of the +oldest nodes on each side. The number of nodes is configured with `akka.cluster.multi-data-center.cross-data-center-connections`. +The reason for only using a limited number of nodes is to keep the number of connections across data +centers low. The same nodes are also used for the gossip protocol when disseminating the membership +information across data centers. Within a data center all nodes are involved in gossip and failure detection. + +This influences how rolling upgrades should be performed. Don't stop all of the oldest that are used for gossip +at the same time. Stop one or a few at a time so that new nodes can take over the responsibility. +It's best to leave the oldest nodes until last. + +See the @ref:[failure detector](cluster.md#failure-detector) for more details. + +## Cluster Singleton + +The @ref[Cluster Singleton](cluster-singleton.md) is a singleton per data center. If you start the +`ClusterSingletonManager` on all nodes and you have defined 3 different data centers there will be +3 active singleton instances in the cluster, one in each data center. This is taken care of automatically, +but is important to be aware of. Designing the system for one singleton per data center makes it possible +for the system to be available also during network partitions between data centers. + +The reason why the singleton is per data center and not global is that membership information is not +guaranteed to be consistent across data centers when using one leader per data center and that makes it +difficult to select a single global singleton. + +If you need a global singleton you have to pick one data center to host that singleton and only start the +`ClusterSingletonManager` on nodes of that data center. If the data center is unreachable from another data center the +singleton is inaccessible, which is a reasonable trade-off when selecting consistency over availability. + +The singleton proxy is by default routing messages to the singleton in the own data center, but +it can be started with a `dataCenter` parameter in the `ClusterSingletonProxySettings` to define that +it should route messages to a singleton located in another data center. That is useful for example when +having a global singleton in one data center and accessing it from other data centers. + +This is how to create a singleton proxy for a specific data center: + +Scala +: @@snip [SingletonCompileOnlySpec.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/SingletonCompileOnlySpec.scala) { #create-singleton-proxy-dc } + +Java +: @@snip [SingletonCompileOnlyTest.java](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/SingletonCompileOnlyTest.java) { #create-singleton-proxy-dc } + +If using the own data center as the `withDataCenter` parameter that would be a proxy for the singleton in the own data center, which +is also the default if `withDataCenter` is not given. + +## Cluster Sharding + +The coordinator in @ref[Cluster Sharding](cluster-sharding.md) is a Cluster Singleton and therefore, +as explained above, Cluster Sharding is also per data center. Each data center will have its own coordinator +and regions, isolated from other data centers. If you start an entity type with the same name on all +nodes and you have defined 3 different data centers and then send messages to the same entity id to +sharding regions in all data centers you will end up with 3 active entity instances for that entity id, +one in each data center. This is because the region/coordinator is only aware of its own data center +and will activate the entity there. It's unaware of the existence of corresponding entities in the +other data centers. + +Especially when used together with Akka Persistence that is based on the single-writer principle +it is important to avoid running the same entity at multiple locations at the same time with a +shared data store. That would result in corrupt data since the events stored by different instances +may be interleaved and would be interpreted differently in a later replay. For active active persistent +entities see Lightbend's [Multi-DC Persistence](https://doc.akka.io/docs/akka-enhancements/current/persistence-dc/index.html) + +If you need global entities you have to pick one data center to host that entity type and only start +`ClusterSharding` on nodes of that data center. If the data center is unreachable from another data center the +entities are inaccessible, which is a reasonable trade-off when selecting consistency over availability. + +The Cluster Sharding proxy is by default routing messages to the shard regions in their own data center, but +it can be started with a `data-center` parameter to define that it should route messages to a shard region +located in another data center. That is useful for example when having global entities in one data center and +accessing them from other data centers. + +This is how to create a sharding proxy for a specific data center: + +Scala +: @@snip [MultiDcClusterShardingSpec.scala](/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala) { #proxy-dc } + +Java +: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #proxy-dc } + +and it can also be used with an `EntityRef`: + +Scala +: @@snip [MultiDcClusterShardingSpec.scala](/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala) { #proxy-dc-entityref } + +Java +: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #proxy-dc-entityref } + +Another way to manage global entities is to make sure that certain entity ids are located in +only one data center by routing the messages to the right region. For example, the routing function +could be that odd entity ids are routed to data center A and even entity ids to data center B. +Before sending the message to the local region actor you make the decision of which data center it should +be routed to. Messages for another data center can be sent with a sharding proxy as explained above and +messages for the own data center are sent to the local region. diff --git a/akka-docs/src/main/paradox/typed/cluster.md b/akka-docs/src/main/paradox/typed/cluster.md index 6d650507ec..dc9d949828 100644 --- a/akka-docs/src/main/paradox/typed/cluster.md +++ b/akka-docs/src/main/paradox/typed/cluster.md @@ -462,4 +462,4 @@ Classic Pub Sub can be used by leveraging the `.toClassic` adapters. See @ref:[Distributed Publish Subscribe in Cluster](../distributed-pub-sub.md). The API is @github[#26338](#26338). @@include[cluster.md](../includes/cluster.md) { #cluster-multidc } -See @ref:[Cluster Multi-DC](../cluster-dc.md). The API for multi-dc sharding is @github[#27705](#27705). +See @ref:[Cluster Multi-DC](cluster-dc.md). diff --git a/akka-docs/src/main/paradox/typed/index-cluster.md b/akka-docs/src/main/paradox/typed/index-cluster.md index c2ccccdb8e..2d5f769f84 100644 --- a/akka-docs/src/main/paradox/typed/index-cluster.md +++ b/akka-docs/src/main/paradox/typed/index-cluster.md @@ -15,11 +15,11 @@ project.description: Akka Cluster concepts, node membership service, CRDT Distri * [cluster-singleton](cluster-singleton.md) * [cluster-sharding](cluster-sharding.md) * [cluster-sharding-specification](cluster-sharding-concepts.md) +* [cluster-dc](cluster-dc.md) * [serialization](../serialization.md) * [serialization-jackson](../serialization-jackson.md) * [multi-jvm-testing](../multi-jvm-testing.md) * [multi-node-testing](../multi-node-testing.md) -* [cluster-dc](../cluster-dc.md) * [remoting-artery](../remoting-artery.md) * [remoting](../remoting.md) * [coordination](../coordination.md)