Add API for multi-dc Sharding/Singleton in Typed, #27705 (#27974)

* add withDataCenter in Entity, following same patterna as the role
* update cluster-dc.md, split in classic and new pages
* fix bug in ClusterSharding shouldHostShard
  * contains on String
* update multi-dc singleton sample
This commit is contained in:
Patrik Nordwall 2019-10-15 12:20:41 +02:00 committed by GitHub
parent c83d04c1f8
commit 46fcca5f39
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 480 additions and 207 deletions

View file

@ -283,7 +283,7 @@ final class ClusterShardingSettings(
@InternalApi
private[akka] def shouldHostShard(cluster: Cluster): Boolean =
role.forall(cluster.selfMember.roles.contains) &&
dataCenter.forall(cluster.selfMember.dataCenter.contains)
dataCenter.forall(_ == cluster.selfMember.dataCenter)
// no withNumberOfShards because it should be defined in configuration to be able to verify same
// value on all nodes with `JoinConfigCompatChecker`

View file

@ -11,8 +11,10 @@ import java.util.concurrent.CompletionStage
import java.util.concurrent.ConcurrentHashMap
import scala.compat.java8.FutureConverters._
import akka.util.JavaDurationConverters._
import scala.concurrent.Future
import akka.actor.ActorRefProvider
import akka.actor.ExtendedActorSystem
import akka.actor.InternalActorRef
@ -28,6 +30,7 @@ import akka.actor.typed.internal.adapter.ActorRefAdapter
import akka.actor.typed.internal.adapter.ActorSystemAdapter
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion
@ -119,12 +122,15 @@ import akka.util.Timeout
case Some(e) => e
}).asInstanceOf[ShardingMessageExtractor[E, M]]
val settingsWithRole = entity.role.fold(settings)(settings.withRole)
val settingsWithDataCenter = entity.dataCenter.fold(settingsWithRole)(settingsWithRole.withDataCenter)
internalInit(
entity.createBehavior,
entity.entityProps,
entity.typeKey,
entity.stopMessage,
entity.role.fold(settings)(settings.withRole),
settingsWithDataCenter,
extractor,
entity.allocationStrategy)
}
@ -142,7 +148,8 @@ import akka.util.Timeout
settings = entity.settings.asScala,
messageExtractor = entity.messageExtractor.asScala,
allocationStrategy = entity.allocationStrategy.asScala,
role = entity.role.asScala))
role = entity.role.asScala,
dataCenter = entity.dataCenter.asScala))
}
private def internalInit[M, E](
@ -240,6 +247,19 @@ import akka.util.Timeout
typeKey.asInstanceOf[EntityTypeKeyImpl[M]])
}
override def entityRefFor[M](
typeKey: scaladsl.EntityTypeKey[M],
entityId: String,
dataCenter: DataCenter): scaladsl.EntityRef[M] = {
if (dataCenter == cluster.selfMember.dataCenter)
entityRefFor(typeKey, entityId)
else
new EntityRefImpl[M](
classicSharding.shardRegionProxy(typeKey.name, dataCenter),
entityId,
typeKey.asInstanceOf[EntityTypeKeyImpl[M]])
}
override def entityRefFor[M](typeKey: javadsl.EntityTypeKey[M], entityId: String): javadsl.EntityRef[M] = {
new EntityRefImpl[M](
classicSharding.shardRegion(typeKey.name),
@ -247,6 +267,19 @@ import akka.util.Timeout
typeKey.asInstanceOf[EntityTypeKeyImpl[M]])
}
override def entityRefFor[M](
typeKey: javadsl.EntityTypeKey[M],
entityId: String,
dataCenter: String): javadsl.EntityRef[M] = {
if (dataCenter == cluster.selfMember.dataCenter)
entityRefFor(typeKey, entityId)
else
new EntityRefImpl[M](
classicSharding.shardRegionProxy(typeKey.name, dataCenter),
entityId,
typeKey.asInstanceOf[EntityTypeKeyImpl[M]])
}
override def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = {
val threshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold
val maxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance

View file

@ -178,7 +178,8 @@ abstract class ClusterSharding {
/**
* Create an `ActorRef`-like reference to a specific sharded entity.
* Currently you have to correctly specify the type of messages the target can handle.
*
* You have to correctly specify the type of messages the target can handle via the `typeKey`.
*
* Messages sent through this [[EntityRef]] will be wrapped in a [[ShardingEnvelope]] including the
* here provided `entityId`.
@ -187,6 +188,18 @@ abstract class ClusterSharding {
*/
def entityRefFor[M](typeKey: EntityTypeKey[M], entityId: String): EntityRef[M]
/**
* Create an `ActorRef`-like reference to a specific sharded entity running in another data center.
*
* You have to correctly specify the type of messages the target can handle via the `typeKey`.
*
* Messages sent through this [[EntityRef]] will be wrapped in a [[ShardingEnvelope]] including the
* provided `entityId`.
*
* For in-depth documentation of its semantics, see [[EntityRef]].
*/
def entityRefFor[M](typeKey: EntityTypeKey[M], entityId: String, dataCenter: String): EntityRef[M]
/**
* Actor for querying Cluster Sharding state
*/
@ -220,6 +233,7 @@ object Entity {
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty())
}
@ -236,7 +250,8 @@ final class Entity[M, E] private (
val settings: Optional[ClusterShardingSettings],
val messageExtractor: Optional[ShardingMessageExtractor[E, M]],
val allocationStrategy: Optional[ShardAllocationStrategy],
val role: Optional[String]) {
val role: Optional[String],
val dataCenter: Optional[String]) {
/**
* [[akka.actor.typed.Props]] of the entity actors, such as dispatcher settings.
@ -276,7 +291,8 @@ final class Entity[M, E] private (
settings,
Optional.ofNullable(newExtractor),
allocationStrategy,
role)
role,
dataCenter)
/**
* Run the Entity actors on nodes with the given role.
@ -284,6 +300,14 @@ final class Entity[M, E] private (
def withRole(role: String): Entity[M, E] =
copy(role = Optional.ofNullable(role))
/**
* The data center of the cluster nodes where the cluster sharding is running.
* If the dataCenter is not specified then the same data center as current node. If the given
* dataCenter does not match the data center of the current node the `ShardRegion` will be started
* in proxy mode.
*/
def withDataCenter(newDataCenter: String): Entity[M, E] = copy(dataCenter = Optional.ofNullable(newDataCenter))
/**
* Allocation strategy which decides on which nodes to allocate new shards,
* [[ClusterSharding#defaultShardAllocationStrategy]] is used if this is not specified.
@ -298,8 +322,18 @@ final class Entity[M, E] private (
entityProps: Props = entityProps,
settings: Optional[ClusterShardingSettings] = settings,
allocationStrategy: Optional[ShardAllocationStrategy] = allocationStrategy,
role: Optional[String] = role): Entity[M, E] = {
new Entity(createBehavior, typeKey, stopMessage, entityProps, settings, messageExtractor, allocationStrategy, role)
role: Optional[String] = role,
dataCenter: Optional[String] = role): Entity[M, E] = {
new Entity(
createBehavior,
typeKey,
stopMessage,
entityProps,
settings,
messageExtractor,
allocationStrategy,
role,
dataCenter)
}
}

View file

@ -20,6 +20,7 @@ import akka.actor.typed.Props
import akka.actor.typed.internal.InternalRecipientRef
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.typed.internal.ClusterShardingImpl
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
@ -178,7 +179,8 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding =
/**
* Create an `ActorRef`-like reference to a specific sharded entity.
* Currently you have to correctly specify the type of messages the target can handle.
*
* You have to correctly specify the type of messages the target can handle via the `typeKey`.
*
* Messages sent through this [[EntityRef]] will be wrapped in a [[ShardingEnvelope]] including the
* here provided `entityId`.
@ -187,6 +189,18 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding =
*/
def entityRefFor[M](typeKey: EntityTypeKey[M], entityId: String): EntityRef[M]
/**
* Create an `ActorRef`-like reference to a specific sharded entity running in another data center.
*
* You have to correctly specify the type of messages the target can handle via the `typeKey`.
*
* Messages sent through this [[EntityRef]] will be wrapped in a [[ShardingEnvelope]] including the
* here provided `entityId`.
*
* For in-depth documentation of its semantics, see [[EntityRef]].
*/
def entityRefFor[M](typeKey: EntityTypeKey[M], entityId: String, dataCenter: DataCenter): EntityRef[M]
/**
* Actor for querying Cluster Sharding state
*/
@ -217,7 +231,7 @@ object Entity {
*/
def apply[M](typeKey: EntityTypeKey[M])(
createBehavior: EntityContext[M] => Behavior[M]): Entity[M, ShardingEnvelope[M]] =
new Entity(createBehavior, typeKey, None, Props.empty, None, None, None, None)
new Entity(createBehavior, typeKey, None, Props.empty, None, None, None, None, None)
}
/**
@ -231,7 +245,8 @@ final class Entity[M, E] private[akka] (
val settings: Option[ClusterShardingSettings],
val messageExtractor: Option[ShardingMessageExtractor[E, M]],
val allocationStrategy: Option[ShardAllocationStrategy],
val role: Option[String]) {
val role: Option[String],
val dataCenter: Option[DataCenter]) {
/**
* [[akka.actor.typed.Props]] of the entity actors, such as dispatcher settings.
@ -271,7 +286,8 @@ final class Entity[M, E] private[akka] (
settings,
Option(newExtractor),
allocationStrategy,
role)
role,
dataCenter)
/**
* Allocation strategy which decides on which nodes to allocate new shards,
@ -283,7 +299,15 @@ final class Entity[M, E] private[akka] (
/**
* Run the Entity actors on nodes with the given role.
*/
def withRole(role: String): Entity[M, E] = copy(role = Some(role))
def withRole(newRole: String): Entity[M, E] = copy(role = Some(newRole))
/**
* The data center of the cluster nodes where the cluster sharding is running.
* If the dataCenter is not specified then the same data center as current node. If the given
* dataCenter does not match the data center of the current node the `ShardRegion` will be started
* in proxy mode.
*/
def withDataCenter(newDataCenter: DataCenter): Entity[M, E] = copy(dataCenter = Some(newDataCenter))
private def copy(
createBehavior: EntityContext[M] => Behavior[M] = createBehavior,
@ -292,8 +316,18 @@ final class Entity[M, E] private[akka] (
entityProps: Props = entityProps,
settings: Option[ClusterShardingSettings] = settings,
allocationStrategy: Option[ShardAllocationStrategy] = allocationStrategy,
role: Option[String] = role): Entity[M, E] = {
new Entity(createBehavior, typeKey, stopMessage, entityProps, settings, messageExtractor, allocationStrategy, role)
role: Option[String] = role,
dataCenter: Option[DataCenter] = dataCenter): Entity[M, E] = {
new Entity(
createBehavior,
typeKey,
stopMessage,
entityProps,
settings,
messageExtractor,
allocationStrategy,
role,
dataCenter)
}
}

View file

@ -9,7 +9,7 @@ import akka.actor.typed.ActorRef
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.typed.{ MultiDcClusterActors, MultiNodeTypedClusterSpec }
import akka.cluster.typed.{ MultiDcPinger, MultiNodeTypedClusterSpec }
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.cluster.MultiNodeClusterSpec
@ -54,9 +54,9 @@ abstract class MultiDcClusterShardingSpec
with ScalaFutures {
import MultiDcClusterShardingSpecConfig._
import MultiDcClusterActors._
import MultiDcPinger._
val typeKey = EntityTypeKey[PingProtocol]("ping")
val typeKey = EntityTypeKey[Command]("ping")
val entityId = "ping-1"
"Cluster sharding in multi dc cluster" must {
@ -66,7 +66,7 @@ abstract class MultiDcClusterShardingSpec
"init sharding" in {
val sharding = ClusterSharding(typedSystem)
val shardRegion: ActorRef[ShardingEnvelope[PingProtocol]] = sharding.init(Entity(typeKey)(_ => multiDcPinger))
val shardRegion: ActorRef[ShardingEnvelope[Command]] = sharding.init(Entity(typeKey)(_ => MultiDcPinger()))
val probe = TestProbe[Pong]
shardRegion ! ShardingEnvelope(entityId, Ping(probe.ref))
probe.expectMessage(max = 15.seconds, Pong(cluster.selfMember.dataCenter))
@ -90,14 +90,45 @@ abstract class MultiDcClusterShardingSpec
enterBarrier("ask")
}
"be able to message cross dc via proxy" in {
"be able to message cross dc via proxy, defined with ClusterShardingSettings" in {
runOn(first, second) {
val proxy: ActorRef[ShardingEnvelope[PingProtocol]] = ClusterSharding(typedSystem).init(
Entity(typeKey)(_ => multiDcPinger).withSettings(ClusterShardingSettings(typedSystem).withDataCenter("dc2")))
val proxy: ActorRef[ShardingEnvelope[Command]] = ClusterSharding(typedSystem).init(
Entity(typeKey)(_ => MultiDcPinger()).withSettings(ClusterShardingSettings(typedSystem).withDataCenter("dc2")))
val probe = TestProbe[Pong]
proxy ! ShardingEnvelope(entityId, Ping(probe.ref))
probe.expectMessage(remainingOrDefault, Pong("dc2"))
}
enterBarrier("done")
enterBarrier("cross-dc-1")
}
"be able to message cross dc via proxy, defined with Entity" in {
runOn(first, second) {
val system = typedSystem
//#proxy-dc
val proxy: ActorRef[ShardingEnvelope[Command]] =
ClusterSharding(system).init(Entity(typeKey)(_ => MultiDcPinger()).withDataCenter("dc2"))
//#proxy-dc
val probe = TestProbe[Pong]
proxy ! ShardingEnvelope(entityId, Ping(probe.ref))
probe.expectMessage(remainingOrDefault, Pong("dc2"))
}
enterBarrier("cross-dc-2")
}
"be able to message cross dc via proxy, defined with EntityRef" in {
runOn(first, second) {
val system = typedSystem
//#proxy-dc-entityref
// it must still be started before usage
ClusterSharding(system).init(Entity(typeKey)(_ => MultiDcPinger()).withDataCenter("dc2"))
val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId, "dc2")
//#proxy-dc-entityref
val probe = TestProbe[Pong]
entityRef ! Ping(probe.ref)
probe.expectMessage(remainingOrDefault, Pong("dc2"))
}
enterBarrier("cross-dc-3")
}
}

View file

@ -226,4 +226,26 @@ interface ShardingCompileOnlyTest {
entityContext.getEntityTypeKey().name(), entityContext.getEntityId()))));
// #persistence
}
public static void dataCenterExample() {
ActorSystem system = ActorSystem.create(Behaviors.empty(), "ShardingExample");
EntityTypeKey<Counter.Command> typeKey = EntityTypeKey.create(Counter.Command.class, "Counter");
String entityId = "a";
// #proxy-dc
ActorRef<ShardingEnvelope<Counter.Command>> proxy =
ClusterSharding.get(system)
.init(
Entity.of(typeKey, ctx -> Counter.create(ctx.getEntityId())).withDataCenter("dc2"));
// #proxy-dc
// #proxy-dc-entityref
// it must still be started before usage
ClusterSharding.get(system)
.init(Entity.of(typeKey, ctx -> Counter.create(ctx.getEntityId())).withDataCenter("dc2"));
EntityRef<Counter.Command> entityRef =
ClusterSharding.get(system).entityRefFor(typeKey, entityId, "dc2");
// #proxy-dc-entityref
}
}

View file

@ -242,7 +242,7 @@ class ClusterSingletonManagerSpec
val proxyDcB = system.actorOf(
ClusterSingletonProxy.props(
singletonManagerPath = "/user/consumer",
settings = ClusterSingletonProxySettings(system).withRole("worker").withDataCenter("B")),
settings = ClusterSingletonProxySettings(system).withDataCenter("B")),
name = "consumerProxyDcB")
//#create-singleton-proxy-dc
proxyDcB

View file

@ -41,7 +41,7 @@ abstract class MultiDcClusterSingletonSpec
extends MultiNodeSpec(MultiDcClusterSingletonSpecConfig)
with MultiNodeTypedClusterSpec {
import MultiDcClusterActors._
import MultiDcPinger._
import MultiDcClusterSingletonSpecConfig._
"A cluster with multiple data centers" must {
@ -64,7 +64,7 @@ abstract class MultiDcClusterSingletonSpec
"be able to create and ping singleton in same DC" in {
runOn(first) {
val singleton = ClusterSingleton(typedSystem)
val pinger = singleton.init(SingletonActor(multiDcPinger, "ping").withStopMessage(NoMore))
val pinger = singleton.init(SingletonActor(MultiDcPinger(), "ping").withStopMessage(NoMore))
val probe = TestProbe[Pong]
pinger ! Ping(probe.ref)
probe.expectMessage(Pong("dc1"))
@ -79,7 +79,7 @@ abstract class MultiDcClusterSingletonSpec
runOn(second) {
val singleton = ClusterSingleton(system.toTyped)
val pinger = singleton.init(
SingletonActor(multiDcPinger, "ping")
SingletonActor(MultiDcPinger(), "ping")
.withStopMessage(NoMore)
.withSettings(ClusterSingletonSettings(typedSystem).withDataCenter("dc1")))
val probe = TestProbe[Pong]
@ -93,7 +93,7 @@ abstract class MultiDcClusterSingletonSpec
"be able to target singleton with the same name in own dc " in {
runOn(second, third) {
val singleton = ClusterSingleton(typedSystem)
val pinger = singleton.init(SingletonActor(multiDcPinger, "ping").withStopMessage(NoMore))
val pinger = singleton.init(SingletonActor(MultiDcPinger(), "ping").withStopMessage(NoMore))
val probe = TestProbe[Pong]
pinger ! Ping(probe.ref)
probe.expectMessage(Pong("dc2"))

View file

@ -5,18 +5,20 @@
package akka.cluster.typed
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.serialization.jackson.CborSerializable
object MultiDcClusterActors {
case class Pong(dc: String) extends CborSerializable
sealed trait PingProtocol extends CborSerializable
case class Ping(ref: ActorRef[Pong]) extends PingProtocol
case object NoMore extends PingProtocol
object MultiDcPinger {
val multiDcPinger = Behaviors.setup[PingProtocol] { ctx =>
sealed trait Command extends CborSerializable
case class Ping(ref: ActorRef[Pong]) extends Command
case object NoMore extends Command
case class Pong(dc: String) extends CborSerializable
def apply(): Behavior[Command] = Behaviors.setup[Command] { ctx =>
val cluster = Cluster(ctx.system)
Behaviors.receiveMessage[PingProtocol] {
Behaviors.receiveMessage[Command] {
case Ping(ref) =>
ref ! Pong(cluster.selfMember.dataCenter)
Behaviors.same

View file

@ -24,6 +24,7 @@ import com.typesafe.config.ConfigFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
// FIXME use awaitAssert to await cluster forming like in BasicClusterExampleSpec
public class BasicClusterExampleTest { // extends JUnitSuite {
@ -146,4 +147,19 @@ public class BasicClusterExampleTest { // extends JUnitSuite {
}
// #hasRole
}
void illustrateDcAccess() {
ActorSystem<Void> system = null;
// #dcAccess
final Cluster cluster = Cluster.get(system);
// this node's data center
String dc = cluster.selfMember().dataCenter();
// all known data centers
Set<String> allDc = cluster.state().getAllDataCenters();
// a specific member's data center
Member aMember = cluster.state().getMembers().iterator().next();
String aDc = aMember.dataCenter();
// #dcAccess
}
}

View file

@ -14,6 +14,7 @@ import java.time.Duration;
// #import
import akka.cluster.typed.ClusterSingleton;
import akka.cluster.typed.ClusterSingletonSettings;
import akka.cluster.typed.SingletonActor;
// #import
@ -117,4 +118,15 @@ public interface SingletonCompileOnlyTest {
// #backoff
proxy.tell(Counter.Increment.INSTANCE); // avoid unused warning
}
public static void dcProxy() {
// #create-singleton-proxy-dc
ActorRef<Counter.Command> singletonProxy =
ClusterSingleton.get(system)
.init(
SingletonActor.of(Counter.create(), "GlobalCounter")
.withSettings(ClusterSingletonSettings.create(system).withDataCenter("B")));
// #create-singleton-proxy-dc
}
}

View file

@ -6,6 +6,7 @@ package docs.akka.cluster.typed
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.testkit.SocketUtil
import com.github.ghik.silencer.silent
import com.typesafe.config.ConfigFactory
import org.scalatest.{ Matchers, WordSpec }
//#cluster-imports
@ -83,6 +84,22 @@ akka {
}
//#hasRole
}
@silent("never used")
def illustrateDcAccess(): Unit = {
val system: ActorSystem[_] = ???
//#dcAccess
val cluster = Cluster(system)
// this node's data center
val dc = cluster.selfMember.dataCenter
// all known data centers
val allDc = cluster.state.allDataCenters
// a specific member's data center
val aMember = cluster.state.members.head
val aDc = aMember.dataCenter
//#dcAccess
}
}
class BasicClusterConfigSpec extends WordSpec with ScalaFutures with Eventually with Matchers with LogCapturing {

View file

@ -6,9 +6,10 @@ package docs.akka.cluster.typed
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, SupervisorStrategy }
import akka.actor.typed.scaladsl.Behaviors
import scala.concurrent.duration._
import akka.cluster.typed.ClusterSingletonSettings
object SingletonCompileOnlySpec {
val system = ActorSystem(Behaviors.empty, "Singleton")
@ -64,4 +65,9 @@ object SingletonCompileOnlySpec {
.onFailure[Exception](SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 0.2)),
"GlobalCounter"))
//#backoff
//#create-singleton-proxy-dc
val singletonProxy: ActorRef[Counter.Command] = ClusterSingleton(system).init(
SingletonActor(Counter(), "GlobalCounter").withSettings(ClusterSingletonSettings(system).withDataCenter("dc2")))
//#create-singleton-proxy-dc
}

View file

@ -1,100 +1,12 @@
# Cluster across multiple data centers
# Classic Multi-DC Cluster
This chapter describes how @ref[Akka Cluster](cluster-usage.md) can be used across
multiple data centers, availability zones or regions.
The reason for making the Akka Cluster aware of data center boundaries is that
communication across data centers typically has much higher latency and higher failure
rate than communication between nodes in the same data center.
However, the grouping of nodes is not limited to the physical boundaries of data centers,
even though that is the primary use case. It could also be used as a logical grouping
for other reasons, such as isolation of certain nodes to improve stability or splitting
up a large cluster into smaller groups of nodes for better scalability.
## Motivation
There can be many reasons for using more than one data center, such as:
* Redundancy to tolerate failures in one location and still be operational.
* Serve requests from a location near the user to provide better responsiveness.
* Balance the load over many servers.
It's possible to run an ordinary Akka Cluster with default settings that spans multiple
data centers but that may result in problems like:
* Management of Cluster membership is stalled during network partitions as described in a
separate section below. This means that nodes would not be able to be added and removed
during network partitions between data centers.
* More frequent false positive failure detection for network connections across data centers.
It's not possible to have different settings for the failure detection within vs. across
data centers.
* Downing/removal of nodes in the case of network partitions should typically be treated
differently for failures within vs. across data centers. For network partitions between
data centers the system should typically not down the unreachable nodes, but instead wait until it heals or
a decision is made by a human or external monitoring system. For failures within same
data center automatic, more aggressive, downing mechanisms can be employed for quick fail over.
* Quick fail over of Cluster Singleton and Cluster Sharding from one data center to another
is difficult to do in a safe way. There is a risk that singletons or sharded entities become
active on both sides of a network partition.
* Lack of location information makes it difficult to optimize communication to prefer nodes
that are close over distant nodes. E.g. a cluster aware router would be more efficient
if it would prefer routing messages to nodes in the own data center.
To avoid some of these problems one can run a separate Akka Cluster per data center and use another
communication channel between the data centers, such as HTTP, an external message broker or
@ref[Cluster Client](cluster-singleton.md). However, many of the nice tools that are built on
top of the Cluster membership information are lost. For example, it wouldn't be possible
to use @ref[Distributed Data](distributed-data.md) across the separate clusters.
We often recommend implementing a micro-service as one Akka Cluster. The external API of the
service would be HTTP, gRPC or a message broker, and not Akka Remoting or Cluster (see additional discussion
in the Lagom Framework docs:
@scala[[Internal and External Communication](https://www.lagomframework.com/documentation/current/scala/InternalAndExternalCommunication.html)]
@java[[Internal and External Communication](https://www.lagomframework.com/documentation/current/java/InternalAndExternalCommunication.html)]),
but the internal communication within the service that is running on several nodes would use ordinary actor
messaging or the tools based on Akka Cluster. When deploying this service to multiple data
centers it would be inconvenient if the internal communication could not use ordinary actor
messaging because it was separated into several Akka Clusters. The benefit of using Akka
messaging internally is performance as well as ease of development and reasoning about
your domain in terms of Actors.
Therefore, it's possible to make the Akka Cluster aware of data centers so that one Akka
Cluster can span multiple data centers and still be tolerant to network partitions.
## Defining the data centers
The features are based on the idea that nodes can be assigned to a group of nodes
by setting the `akka.cluster.multi-data-center.self-data-center` configuration property.
A node can only belong to one data center and if nothing is specified a node will belong
to the `default` data center.
The grouping of nodes is not limited to the physical boundaries of data centers,
even though that is the primary use case. It could also be used as a logical grouping
for other reasons, such as isolation of certain nodes to improve stability or splitting
up a large cluster into smaller groups of nodes for better scalability.
For the full documentation of this feature and for new projects see @ref:[Multi-DC Cluster](typed/cluster-dc.md).
## Membership
Some @ref[membership transitions](typed/cluster-membership.md#membership-lifecycle) are managed by
one node called the @ref[leader](typed/cluster-concepts.md#leader). There is one leader per data center
and it is responsible for these transitions for the members within the same data center. Members of
other data centers are managed independently by the leader of the respective data center. These actions
cannot be performed while there are any unreachability observations among the nodes in the data center,
but unreachability across different data centers don't influence the progress of membership management
within a data center. Nodes can be added and removed also when there are network partitions between
data centers, which is impossible if nodes are not grouped into data centers.
![cluster-dc.png](./images/cluster-dc.png)
User actions like joining, leaving, and downing can be sent to any node in the cluster,
not only to the nodes in the data center of the node. Seed nodes are also global.
The data center membership is implemented by adding the data center name prefixed with `"dc-"` to the
@ref[roles](typed/cluster.md#node-roles) of the member and thereby this information is known
by all other members in the cluster. This is an implementation detail, but it can be good to know
if you see this in log messages.
You can retrieve information about what data center a member belongs to:
Scala
@ -103,58 +15,10 @@ Scala
Java
: @@snip [ClusterDocTest.java](/akka-docs/src/test/java/jdocs/cluster/ClusterDocTest.java) { #dcAccess }
## Failure Detection
@ref[Failure detection](typed/cluster-concepts.md#failure-detector) is performed by sending heartbeat messages
to detect if a node is unreachable. This is done more frequently and with more certainty among
the nodes in the same data center than across data centers. The failure detection across different data centers should
be interpreted as an indication of problem with the network link between the data centers.
Two different failure detectors can be configured for these two purposes:
* `akka.cluster.failure-detector` for failure detection within own data center
* `akka.cluster.multi-data-center.failure-detector` for failure detection across different data centers
When @ref[subscribing to cluster events](cluster-usage.md#cluster-subscriber) the `UnreachableMember` and
`ReachableMember` events are for observations within the own data center. The same data center as where the
subscription was registered.
For cross data center unreachability notifications you can subscribe to `UnreachableDataCenter` and `ReachableDataCenter`
events.
Heartbeat messages for failure detection across data centers are only performed between a number of the
oldest nodes on each side. The number of nodes is configured with `akka.cluster.multi-data-center.cross-data-center-connections`.
The reason for only using a limited number of nodes is to keep the number of connections across data
centers low. The same nodes are also used for the gossip protocol when disseminating the membership
information across data centers. Within a data center all nodes are involved in gossip and failure detection.
This influences how rolling upgrades should be performed. Don't stop all of the oldest that are used for gossip
at the same time. Stop one or a few at a time so that new nodes can take over the responsibility.
It's best to leave the oldest nodes until last.
See the @ref:[failure detector](typed/cluster.md#failure-detector) for more details.
For the full documentation of this feature and for new projects see @ref:[Multi-DC Cluster](typed/cluster-dc.md#membership).
## Cluster Singleton
The @ref[Cluster Singleton](cluster-singleton.md) is a singleton per data center. If you start the
`ClusterSingletonManager` on all nodes and you have defined 3 different data centers there will be
3 active singleton instances in the cluster, one in each data center. This is taken care of automatically,
but is important to be aware of. Designing the system for one singleton per data center makes it possible
for the system to be available also during network partitions between data centers.
The reason why the singleton is per data center and not global is that membership information is not
guaranteed to be consistent across data centers when using one leader per data center and that makes it
difficult to select a single global singleton.
If you need a global singleton you have to pick one data center to host that singleton and only start the
`ClusterSingletonManager` on nodes of that data center. If the data center is unreachable from another data center the
singleton is inaccessible, which is a reasonable trade-off when selecting consistency over availability.
The `ClusterSingletonProxy` is by default routing messages to the singleton in the own data center, but
it can be started with a `data-center` parameter in the `ClusterSingletonProxySettings` to define that
it should route messages to a singleton located in another data center. That is useful for example when
having a global singleton in one data center and accessing it from other data centers.
This is how to create a singleton proxy for a specific data center:
Scala
@ -166,32 +30,10 @@ Java
If using the own data center as the `withDataCenter` parameter that would be a proxy for the singleton in the own data center, which
is also the default if `withDataCenter` is not given.
For the full documentation of this feature and for new projects see @ref:[Multi-DC Cluster](typed/cluster-dc.md#cluster-singleton).
## Cluster Sharding
The coordinator in @ref[Cluster Sharding](cluster-sharding.md) is a Cluster Singleton and therefore,
as explained above, Cluster Sharding is also per data center. Each data center will have its own coordinator
and regions, isolated from other data centers. If you start an entity type with the same name on all
nodes and you have defined 3 different data centers and then send messages to the same entity id to
sharding regions in all data centers you will end up with 3 active entity instances for that entity id,
one in each data center. This is because the region/coordinator is only aware of its own data center
and will activate the entity there. It's unaware of the existence of corresponding entities in the
other data centers.
Especially when used together with Akka Persistence that is based on the single-writer principle
it is important to avoid running the same entity at multiple locations at the same time with a
shared data store. That would result in corrupt data since the events stored by different instances
may be interleaved and would be interpreted differently in a later replay. For active active persistent
entities see Lightbend's [Multi-DC Persistence](https://doc.akka.io/docs/akka-enhancements/current/persistence-dc/index.html)
If you need global entities you have to pick one data center to host that entity type and only start
`ClusterSharding` on nodes of that data center. If the data center is unreachable from another data center the
entities are inaccessible, which is a reasonable trade-off when selecting consistency over availability.
The Cluster Sharding proxy is by default routing messages to the shard regions in their own data center, but
it can be started with a `data-center` parameter to define that it should route messages to a shard region
located in another data center. That is useful for example when having global entities in one data center and
accessing them from other data centers.
This is how to create a sharding proxy for a specific data center:
Scala
@ -200,9 +42,4 @@ Scala
Java
: @@snip [ClusterShardingTest.java](/akka-docs/src/test/java/jdocs/sharding/ClusterShardingTest.java) { #proxy-dc }
Another way to manage global entities is to make sure that certain entity ids are located in
only one data center by routing the messages to the right region. For example, the routing function
could be that odd entity ids are routed to data center A and even entity ids to data center B.
Before sending the message to the local region actor you make the decision of which data center it should
be routed to. Messages for another data center can be sent with a sharding proxy as explained above and
messages for the own data center are sent to the local region.
For the full documentation of this feature and for new projects see @ref:[Multi-DC Cluster](typed/cluster-dc.md#cluster-sharding).

View file

@ -15,6 +15,7 @@ For the new API see @ref[Cluster](typed/index-cluster.md).
* [cluster-sharding](cluster-sharding.md)
* [cluster-metrics](cluster-metrics.md)
* [distributed-data](distributed-data.md)
* [cluster-dc](cluster-dc.md)
* [serialization](serialization-classic.md)
@@@

View file

@ -0,0 +1,228 @@
# Multi-DC Cluster
@@@ note
For the Akka Classic documentation of this feature see @ref:[Classic Multi-DC Cluster](../cluster-dc.md)
@@@
This chapter describes how @ref[Akka Cluster](cluster.md) can be used across
multiple data centers, availability zones or regions.
The reason for making the Akka Cluster aware of data center boundaries is that
communication across data centers typically has much higher latency and higher failure
rate than communication between nodes in the same data center.
However, the grouping of nodes is not limited to the physical boundaries of data centers,
even though that is the primary use case. It could also be used as a logical grouping
for other reasons, such as isolation of certain nodes to improve stability or splitting
up a large cluster into smaller groups of nodes for better scalability.
## Dependency
To use Akka Cluster add the following dependency in your project:
@@dependency[sbt,Maven,Gradle] {
group=com.typesafe.akka
artifact=akka-cluster-typed_$scala.binary_version$
version=$akka.version$
}
## Motivation
There can be many reasons for using more than one data center, such as:
* Redundancy to tolerate failures in one location and still be operational.
* Serve requests from a location near the user to provide better responsiveness.
* Balance the load over many servers.
It's possible to run an ordinary Akka Cluster with default settings that spans multiple
data centers but that may result in problems like:
* Management of Cluster membership is stalled during network partitions as described in a
separate section below. This means that nodes would not be able to be added and removed
during network partitions between data centers.
* More frequent false positive failure detection for network connections across data centers.
It's not possible to have different settings for the failure detection within vs. across
data centers.
* Downing/removal of nodes in the case of network partitions should typically be treated
differently for failures within vs. across data centers. For network partitions between
data centers the system should typically not down the unreachable nodes, but instead wait until it heals or
a decision is made by a human or external monitoring system. For failures within same
data center automatic, more aggressive, downing mechanisms can be employed for quick fail over.
* Quick fail over of Cluster Singleton and Cluster Sharding from one data center to another
is difficult to do in a safe way. There is a risk that singletons or sharded entities become
active on both sides of a network partition.
* Lack of location information makes it difficult to optimize communication to prefer nodes
that are close over distant nodes. E.g. a cluster aware router would be more efficient
if it would prefer routing messages to nodes in the own data center.
To avoid some of these problems one can run a separate Akka Cluster per data center and use another
communication channel between the data centers, such as HTTP, an external message broker.
However, many of the nice tools that are built on top of the Cluster membership information are lost.
For example, it wouldn't be possible to use @ref[Distributed Data](distributed-data.md) across the separate clusters.
We often recommend implementing a micro-service as one Akka Cluster. The external API of the
service would be HTTP, gRPC or a message broker, and not Akka Remoting or Cluster (see additional discussion
in @ref:[When and where to use Akka Cluster](choosing-cluster.md)).
The internal communication within the service that is running on several nodes would use ordinary actor
messaging or the tools based on Akka Cluster. When deploying this service to multiple data
centers it would be inconvenient if the internal communication could not use ordinary actor
messaging because it was separated into several Akka Clusters. The benefit of using Akka
messaging internally is performance as well as ease of development and reasoning about
your domain in terms of Actors.
Therefore, it's possible to make the Akka Cluster aware of data centers so that one Akka
Cluster can span multiple data centers and still be tolerant to network partitions.
## Defining the data centers
The features are based on the idea that nodes can be assigned to a group of nodes
by setting the `akka.cluster.multi-data-center.self-data-center` configuration property.
A node can only belong to one data center and if nothing is specified a node will belong
to the `default` data center.
The grouping of nodes is not limited to the physical boundaries of data centers,
even though that is the primary use case. It could also be used as a logical grouping
for other reasons, such as isolation of certain nodes to improve stability or splitting
up a large cluster into smaller groups of nodes for better scalability.
## Membership
Some @ref[membership transitions](cluster-membership.md#membership-lifecycle) are managed by
one node called the @ref[leader](cluster-concepts.md#leader). There is one leader per data center
and it is responsible for these transitions for the members within the same data center. Members of
other data centers are managed independently by the leader of the respective data center. These actions
cannot be performed while there are any unreachability observations among the nodes in the data center,
but unreachability across different data centers don't influence the progress of membership management
within a data center. Nodes can be added and removed also when there are network partitions between
data centers, which is impossible if nodes are not grouped into data centers.
![cluster-dc.png](../images/cluster-dc.png)
User actions like joining, leaving, and downing can be sent to any node in the cluster,
not only to the nodes in the data center of the node. Seed nodes are also global.
The data center membership is implemented by adding the data center name prefixed with `"dc-"` to the
@ref[roles](cluster.md#node-roles) of the member and thereby this information is known
by all other members in the cluster. This is an implementation detail, but it can be good to know
if you see this in log messages.
You can retrieve information about what data center a member belongs to:
Scala
: @@snip [BasicClusterExampleSpec.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala) { #dcAccess }
Java
: @@snip [BasicClusterExampleTest.java](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/BasicClusterExampleTest.java) { #dcAccess }
## Failure Detection
@ref[Failure detection](cluster-concepts.md#failure-detector) is performed by sending heartbeat messages
to detect if a node is unreachable. This is done more frequently and with more certainty among
the nodes in the same data center than across data centers. The failure detection across different data centers should
be interpreted as an indication of problem with the network link between the data centers.
Two different failure detectors can be configured for these two purposes:
* `akka.cluster.failure-detector` for failure detection within own data center
* `akka.cluster.multi-data-center.failure-detector` for failure detection across different data centers
When @ref[subscribing to cluster events](cluster.md#cluster-subscriptions) the `UnreachableMember` and
`ReachableMember` events are for observations within the own data center. The same data center as where the
subscription was registered.
For cross data center unreachability notifications you can subscribe to `UnreachableDataCenter` and `ReachableDataCenter`
events.
Heartbeat messages for failure detection across data centers are only performed between a number of the
oldest nodes on each side. The number of nodes is configured with `akka.cluster.multi-data-center.cross-data-center-connections`.
The reason for only using a limited number of nodes is to keep the number of connections across data
centers low. The same nodes are also used for the gossip protocol when disseminating the membership
information across data centers. Within a data center all nodes are involved in gossip and failure detection.
This influences how rolling upgrades should be performed. Don't stop all of the oldest that are used for gossip
at the same time. Stop one or a few at a time so that new nodes can take over the responsibility.
It's best to leave the oldest nodes until last.
See the @ref:[failure detector](cluster.md#failure-detector) for more details.
## Cluster Singleton
The @ref[Cluster Singleton](cluster-singleton.md) is a singleton per data center. If you start the
`ClusterSingletonManager` on all nodes and you have defined 3 different data centers there will be
3 active singleton instances in the cluster, one in each data center. This is taken care of automatically,
but is important to be aware of. Designing the system for one singleton per data center makes it possible
for the system to be available also during network partitions between data centers.
The reason why the singleton is per data center and not global is that membership information is not
guaranteed to be consistent across data centers when using one leader per data center and that makes it
difficult to select a single global singleton.
If you need a global singleton you have to pick one data center to host that singleton and only start the
`ClusterSingletonManager` on nodes of that data center. If the data center is unreachable from another data center the
singleton is inaccessible, which is a reasonable trade-off when selecting consistency over availability.
The singleton proxy is by default routing messages to the singleton in the own data center, but
it can be started with a `dataCenter` parameter in the `ClusterSingletonProxySettings` to define that
it should route messages to a singleton located in another data center. That is useful for example when
having a global singleton in one data center and accessing it from other data centers.
This is how to create a singleton proxy for a specific data center:
Scala
: @@snip [SingletonCompileOnlySpec.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/SingletonCompileOnlySpec.scala) { #create-singleton-proxy-dc }
Java
: @@snip [SingletonCompileOnlyTest.java](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/SingletonCompileOnlyTest.java) { #create-singleton-proxy-dc }
If using the own data center as the `withDataCenter` parameter that would be a proxy for the singleton in the own data center, which
is also the default if `withDataCenter` is not given.
## Cluster Sharding
The coordinator in @ref[Cluster Sharding](cluster-sharding.md) is a Cluster Singleton and therefore,
as explained above, Cluster Sharding is also per data center. Each data center will have its own coordinator
and regions, isolated from other data centers. If you start an entity type with the same name on all
nodes and you have defined 3 different data centers and then send messages to the same entity id to
sharding regions in all data centers you will end up with 3 active entity instances for that entity id,
one in each data center. This is because the region/coordinator is only aware of its own data center
and will activate the entity there. It's unaware of the existence of corresponding entities in the
other data centers.
Especially when used together with Akka Persistence that is based on the single-writer principle
it is important to avoid running the same entity at multiple locations at the same time with a
shared data store. That would result in corrupt data since the events stored by different instances
may be interleaved and would be interpreted differently in a later replay. For active active persistent
entities see Lightbend's [Multi-DC Persistence](https://doc.akka.io/docs/akka-enhancements/current/persistence-dc/index.html)
If you need global entities you have to pick one data center to host that entity type and only start
`ClusterSharding` on nodes of that data center. If the data center is unreachable from another data center the
entities are inaccessible, which is a reasonable trade-off when selecting consistency over availability.
The Cluster Sharding proxy is by default routing messages to the shard regions in their own data center, but
it can be started with a `data-center` parameter to define that it should route messages to a shard region
located in another data center. That is useful for example when having global entities in one data center and
accessing them from other data centers.
This is how to create a sharding proxy for a specific data center:
Scala
: @@snip [MultiDcClusterShardingSpec.scala](/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala) { #proxy-dc }
Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #proxy-dc }
and it can also be used with an `EntityRef`:
Scala
: @@snip [MultiDcClusterShardingSpec.scala](/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala) { #proxy-dc-entityref }
Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #proxy-dc-entityref }
Another way to manage global entities is to make sure that certain entity ids are located in
only one data center by routing the messages to the right region. For example, the routing function
could be that odd entity ids are routed to data center A and even entity ids to data center B.
Before sending the message to the local region actor you make the decision of which data center it should
be routed to. Messages for another data center can be sent with a sharding proxy as explained above and
messages for the own data center are sent to the local region.

View file

@ -462,4 +462,4 @@ Classic Pub Sub can be used by leveraging the `.toClassic` adapters.
See @ref:[Distributed Publish Subscribe in Cluster](../distributed-pub-sub.md). The API is @github[#26338](#26338).
@@include[cluster.md](../includes/cluster.md) { #cluster-multidc }
See @ref:[Cluster Multi-DC](../cluster-dc.md). The API for multi-dc sharding is @github[#27705](#27705).
See @ref:[Cluster Multi-DC](cluster-dc.md).

View file

@ -15,11 +15,11 @@ project.description: Akka Cluster concepts, node membership service, CRDT Distri
* [cluster-singleton](cluster-singleton.md)
* [cluster-sharding](cluster-sharding.md)
* [cluster-sharding-specification](cluster-sharding-concepts.md)
* [cluster-dc](cluster-dc.md)
* [serialization](../serialization.md)
* [serialization-jackson](../serialization-jackson.md)
* [multi-jvm-testing](../multi-jvm-testing.md)
* [multi-node-testing](../multi-node-testing.md)
* [cluster-dc](../cluster-dc.md)
* [remoting-artery](../remoting-artery.md)
* [remoting](../remoting.md)
* [coordination](../coordination.md)