diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index be7b312764..60d3c1a12d 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -699,6 +699,21 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu } + "demonstrate API for DC proxy" in within(50.seconds) { + runOn(sixth) { + // #proxy-dc + val counterProxyDcB: ActorRef = ClusterSharding(system).startProxy( + typeName = "Counter", + role = None, + dataCenter = Some("B"), + extractEntityId = extractEntityId, + extractShardId = extractShardId) + // #proxy-dc + } + enterBarrier("after-dc-proxy") + + } + "Persistent Cluster Shards" must { "recover entities upon restart" in within(50.seconds) { runOn(third, fourth, fifth) { diff --git a/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java b/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java index 2206a8fa65..2c480da21e 100644 --- a/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java +++ b/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java @@ -6,6 +6,7 @@ package akka.cluster.sharding; import static java.util.concurrent.TimeUnit.SECONDS; +import java.util.Optional; import scala.concurrent.duration.Duration; import akka.actor.AbstractActor; @@ -91,6 +92,15 @@ public class ClusterShardingTest { ClusterSharding.get(system).start("SupervisedCounter", Props.create(CounterSupervisor.class), settings, messageExtractor); //#counter-supervisor-start + + //#proxy-dc + ActorRef counterProxyDcB = + ClusterSharding.get(system).startProxy( + "Counter", + Optional.empty(), + Optional.of("B"), // data center name + messageExtractor); + //#proxy-dc } public void demonstrateUsage2() { diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala index fe25daae83..efe3cbda1f 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala @@ -228,12 +228,26 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS def createSingletonProxy(): ActorRef = { //#create-singleton-proxy - system.actorOf( + val proxy = system.actorOf( ClusterSingletonProxy.props( singletonManagerPath = "/user/consumer", settings = ClusterSingletonProxySettings(system).withRole("worker")), name = "consumerProxy") //#create-singleton-proxy + proxy + } + + def createSingletonProxyDc(): ActorRef = { + //#create-singleton-proxy-dc + val proxyDcB = system.actorOf( + ClusterSingletonProxy.props( + singletonManagerPath = "/user/consumer", + settings = ClusterSingletonProxySettings(system) + .withRole("worker") + .withDataCenter("B")), + name = "consumerProxyDcB") + //#create-singleton-proxy-dc + proxyDcB } def verifyProxyMsg(oldest: RoleName, proxyNode: RoleName, msg: Int): Unit = { diff --git a/akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java b/akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java index 846460e27b..c848f27d47 100644 --- a/akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java +++ b/akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java @@ -5,6 +5,10 @@ package akka.cluster.singleton; import akka.actor.ActorSystem; + +import java.util.HashMap; +import java.util.Map; + import akka.actor.ActorRef; import akka.actor.Props; @@ -32,8 +36,17 @@ public class ClusterSingletonManagerTest { ClusterSingletonProxySettings proxySettings = ClusterSingletonProxySettings.create(system).withRole("worker"); - system.actorOf(ClusterSingletonProxy.props("/user/consumer", proxySettings), + ActorRef proxy = + system.actorOf(ClusterSingletonProxy.props("/user/consumer", proxySettings), "consumerProxy"); //#create-singleton-proxy + + //#create-singleton-proxy-dc + ActorRef proxyDcB = + system.actorOf(ClusterSingletonProxy.props("/user/consumer", + ClusterSingletonProxySettings.create(system) + .withRole("worker") + .withDataCenter("B")), "consumerProxyDcB"); + //#create-singleton-proxy-dc } } diff --git a/akka-docs/src/main/paradox/images/cluster-dc.png b/akka-docs/src/main/paradox/images/cluster-dc.png new file mode 100644 index 0000000000..40319e4579 Binary files /dev/null and b/akka-docs/src/main/paradox/images/cluster-dc.png differ diff --git a/akka-docs/src/main/paradox/java/cluster-dc.md b/akka-docs/src/main/paradox/java/cluster-dc.md new file mode 120000 index 0000000000..9adf7076c9 --- /dev/null +++ b/akka-docs/src/main/paradox/java/cluster-dc.md @@ -0,0 +1 @@ +../scala/cluster-dc.md \ No newline at end of file diff --git a/akka-docs/src/main/paradox/java/cluster-team.md b/akka-docs/src/main/paradox/java/cluster-team.md deleted file mode 120000 index b25b4db61f..0000000000 --- a/akka-docs/src/main/paradox/java/cluster-team.md +++ /dev/null @@ -1 +0,0 @@ -../scala/cluster-team.md \ No newline at end of file diff --git a/akka-docs/src/main/paradox/java/index-network.md b/akka-docs/src/main/paradox/java/index-network.md index 4e30d43d49..d0c10f605e 100644 --- a/akka-docs/src/main/paradox/java/index-network.md +++ b/akka-docs/src/main/paradox/java/index-network.md @@ -12,6 +12,7 @@ * [cluster-sharding](cluster-sharding.md) * [cluster-metrics](cluster-metrics.md) * [distributed-data](distributed-data.md) +* [cluster-dc](cluster-dc.md) * [remoting](remoting.md) * [remoting-artery](remoting-artery.md) * [serialization](serialization.md) diff --git a/akka-docs/src/main/paradox/scala/cluster-dc.md b/akka-docs/src/main/paradox/scala/cluster-dc.md new file mode 100644 index 0000000000..6461ef802e --- /dev/null +++ b/akka-docs/src/main/paradox/scala/cluster-dc.md @@ -0,0 +1,191 @@ +# Cluster across multiple data centers + +This chapter describes how @ref[Akka Cluster](cluster-usage.md) can be used across +multiple data centers, availability zones or regions. + +## Motivation + +There can be many reasons for using more than one data center, such as: + +* Redundancy to tolerate failures in one location and still be operational. +* Serve request from a location near the user to provide better responsiveness. +* Balance the load over many servers. + +It's possible to run an ordinary Akka Cluster with default settings that spans multiple +data centers but that may result in problems like: + +* Management of Cluster membership is stalled during network partitions as described in a + separate section below. This means that nodes would not be able to be added and removed + during network partitions across data centers. +* More frequent false positive failure detection for network connections across data centers. + It's not possible to have different settings for the failure detection within vs. across + data centers. +* Downing/removal of nodes in the case of network partitions should typically be treated + differently for failures within vs. across data centers. For network partitions across the + system should typically not down the unreachable nodes, but instead wait until it heals or + an decision is made by a human or external monitoring system. For failures within same + data center automatic, more aggressive, downing mechanisms can be employed for quick fail over. +* Quick fail over of Cluster Singleton and Cluster Sharding from one data center to another + is difficult to do in a safe way. There is a risk that singletons or sharded entities become + active on both sides of a network partition. +* Lack of location information makes it difficult to optimize communication to prefer nodes + that are close over distant nodes. E.g. a cluster aware router would be more efficient + if it would prefer routing messages to nodes in the own data center. + +To avoid some of these problems one can run a separate Akka Cluster per data center and use another +communication channel between the data centers, such as HTTP, an external message broker or +[Cluster Client](cluster-singleton.md). However, many of the nice tools that are built on +top of the Cluster membership information are lost. For example, it wouldn't be possible +to use [Distributed Data](distributed-data.md) across the separate clusters. + +We often recommend implementing a micro-service as one Akka Cluster. The external API of the +service would be HTTP or a message broker, and not Akka Remoting or Cluster, but the internal +communication within the service that is running on several nodes would use ordinary actor +messaging or the tools based on Akka Cluster. When deploying this service to multiple data +centers it would be inconvenient if the internal communication could not use ordinary actor +messaging because it was separated into several Akka Clusters. The benefit of using Akka +messaging internally is performance as well as ease of development and reasoning about +your domain in terms of Actors. + +Therefore, it's possible to make the Akka Cluster aware of data centers so that one Akka +Cluster can span multiple data centers and still be tolerant to network partitions. + +## Defining the data centers + +The features are based on the idea that nodes can be assigned to a group of nodes +by setting the `akka.cluster.multi-data-center.self-data-center` configuration property. +A node can only belong to one data center and if nothing is specified a node will belong +to the `default` data center. + +The grouping of nodes is not limited to the physical boundaries of data centers, +even though that is the primary use case. It could also be used as a logical grouping +for other reasons, such as isolation of certain nodes to improve stability or splitting +up a large cluster into smaller groups of nodes for better scalability. + +## Membership + +Some @ref[membership transitions](common/cluster.md#membership-lifecycle) are managed by +one node called the @ref[leader](common/cluster.md#leader). There is one leader per data center +and it is responsible for these transitions for the members within the same data center. Members of +other data centers are managed independently by leader of the respective data center. These actions +cannot be performed while there are any unreachability observations among the nodes in the data center, +but unreachability across different data centers don't influence the progress of membership management +within a data center. Nodes can be added and removed also when there are network partitions between +data centers, which is impossible if nodes are not grouped into data centers. + +![cluster-dc.png](../images/cluster-dc.png) + +User actions like joining, leaving, and downing can be sent to any node in the cluster, +not only to the nodes in the data center of the node. Seed nodes are also global. + +The data center membership is implemented by adding the data center name prefixed with `"dc-"` to the +@ref[roles](cluster-usage.md#node-roles) of the member and thereby this information is known +by all other members in the cluster. This is an implementation detail, but it can be good to know +if you see this in log messages. + +You can retrieve information about what data center a member belongs to: + +Scala +: @@snip [ClusterDocSpec.scala]($code$/scala/docs/cluster/ClusterDocSpec.scala) { #dcAccess } + +Java +: @@snip [ClusterDocTest.java]($code$/java/jdocs/cluster/ClusterDocTest.java) { #dcAccess } + +## Failure Detection + +@ref[Failure detection](cluster-usage.md#failure-detector) is performed by sending heartbeat messages +to detect if a node is unreachable. This is done more frequently and with more certainty among +the nodes in the same data center than across data centers. The failure detection across different data centers should +be interpreted as an indication of problem with the network link between the data centers. + +Two different failure detectors can be configured for these two purposes: + +* `akka.cluster.failure-detector` for failure detection within own data center +* `akka.cluster.multi-data-center.failure-detector` for failure detection across different data centers + +When @ref[subscribing to cluster events](cluster-usage.md#cluster-subscriber) the `UnreachableMember` and +`ReachableMember` events are for observations within the own data center. The same data center as where the +subscription was registered. + +For cross data center unreachability notifications you can subscribe to `UnreachableDataCenter` and `ReachableDataCenter` +events. + +Heartbeat messages for failure detection across data centers are only performed between a number of the +oldest nodes on each side. The number of nodes is configured with `akka.cluster.multi-data-center.cross-data-center-connections`. +The reason for only using a limited number of nodes is to keep the number of connections across data +centers low. The same nodes are also used for the gossip protocol when disseminating the membership +information across data centers. Within a data center all nodes are involved in gossip and failure detection. + +This influence how rolling upgrades should be performed. Don't stop all of the oldest that are used for gossip +at the same time. Stop one or a few at a time so that new nodes can take over the responsibility. +It's best to leave the oldest nodes until last. + +## Cluster Singleton + +The [Cluster Singleton](cluster-singleton.md) is a singleton per data center. If you start the +`ClusterSingletonManager` on all nodes and you have defined 3 different data centers there will be +3 active singleton instances in the cluster, one in each data center. This is taken care of automatically, +but is important to be aware of. Designing the system for one singleton per data center makes it possible +for the system to be available also during network partitions between data centers. + +The reason why the singleton is per data center and not global is that membership information is not +guaranteed to be consistent across data centers when using one leader per data center and that makes it +difficult to select a single global singleton. + +If you need a global singleton you have to pick one data center to host that singleton and only start the +`ClusterSingletonManager` on nodes of that data center. If the data center is unreachable from another data center the +singleton is inaccessible, which is a reasonable trade-off when selecting consistency over availability. + +The `ClusterSingletonProxy` is by default routing messages to the singleton in the own data center, but +it can be started with a `data-center` parameter in the `ClusterSingletonProxySettings` to define that +it should route messages to a singleton located in another data center. That is useful for example when +having a global singleton in one data center and accessing it from other data centers. + +This is how to create a singleton proxy for a specific data center: + +Scala +: @@snip [ClusterSingletonManagerSpec.scala]($akka$/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala) { #create-singleton-proxy-dc } + +Java +: @@snip [ClusterSingletonManagerTest.java]($akka$/akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java) { #create-singleton-proxy-dc } + +If using the own data center as the `withDataCenter` parameter that would be a proxy for the singleton in the own data center, which +is also the default if `withDataCenter` is not given. + +## Cluster Sharding + +The coordinator in [Cluster Sharding](cluster-sharding.md) is a Cluster Singleton and therefore, +as explained above, Cluster Sharding is also per data center. Each data center will have its own coordinator +and regions, isolated from other data centers. If you start an entity type with the same name on all +nodes and you have defined 3 different data centers and then send messages to the same entity id to +sharding regions in all data centers you will end up with 3 active entity instances for that entity id, +one in each data center. + +Especially when used together with Akka Persistence that is based on the single-writer principle +it is important to avoid running the same entity at multiple locations at the same time with a +shared data store. Lightbend's Akka Team is working on a solution that will support multiple active +entities that will work nicely together with Cluster Sharding across multiple data centers. + +If you need global entities you have to pick one data center to host that entity type and only start +`ClusterSharding` on nodes of that data center. If the data center is unreachable from another data center the +entities are inaccessible, which is a reasonable trade-off when selecting consistency over availability. + +The Cluster Sharding proxy is by default routing messages to the shard regions in the own data center, but +it can be started with a `data-center` parameter to define that it should route messages to a shard region +located in another data center. That is useful for example when having global entities in one data center and +accessing them from other data centers. + +This is how to create a sharding proxy for a specific data center: + +Scala +: @@snip [ClusterShardingSpec.scala]($akka$/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala) { #proxy-dc } + +Java +: @@snip [ClusterShardingTest.java]($akka$/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java) { #proxy-dc } + +Another way to manage global entities is to make sure that certain entity ids are located in +only one data center by routing the messages to the right region. For example, the routing function +could be that odd entity ids are routed to data center A and even entity ids to data center B. +Before sending the message to the local region actor you make the decision of which data center it should +be routed to. Messages for another data center can be sent with a sharding proxy as explained above and +messages for the own data center are sent to the local region. diff --git a/akka-docs/src/main/paradox/scala/cluster-sharding.md b/akka-docs/src/main/paradox/scala/cluster-sharding.md index 9b8ca9ffed..535132e925 100644 --- a/akka-docs/src/main/paradox/scala/cluster-sharding.md +++ b/akka-docs/src/main/paradox/scala/cluster-sharding.md @@ -36,7 +36,7 @@ See @ref:[Downing](cluster-usage.md#automatic-vs-manual-downing). This is how an entity actor may look like: Scala -: @@snip [ClusterShardingSpec.scala]($akka$/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala) { #counter-actor } +: @@snip [ClusterShardingSpec.scala]($akka$/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala) { #proxy-dc } Java : @@snip [ClusterShardingTest.java]($akka$/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java) { #counter-actor } diff --git a/akka-docs/src/main/paradox/scala/cluster-team.md b/akka-docs/src/main/paradox/scala/cluster-team.md deleted file mode 100644 index a4019d360f..0000000000 --- a/akka-docs/src/main/paradox/scala/cluster-team.md +++ /dev/null @@ -1,15 +0,0 @@ -# Cluster Team - -@@@ note - -Cluster teams are a work-in-progress feature, and behavior is still expected to change. - -@@@ - -Teams are used to make islands of the cluster that are colocated. This can be used -to make the cluster aware that it is running across multiple availability zones or regions. - -Cluster nodes can be assigned to a team by setting the `akka.cluster.team` setting. -When no team is specified, a node will belong to the 'default' team. - -The team is added to the list of roles of the node with the prefix "team-". \ No newline at end of file diff --git a/akka-docs/src/main/paradox/scala/index-network.md b/akka-docs/src/main/paradox/scala/index-network.md index 8e10cade76..ae4015ed67 100644 --- a/akka-docs/src/main/paradox/scala/index-network.md +++ b/akka-docs/src/main/paradox/scala/index-network.md @@ -12,6 +12,7 @@ * [cluster-sharding](cluster-sharding.md) * [cluster-metrics](cluster-metrics.md) * [distributed-data](distributed-data.md) +* [cluster-dc](cluster-dc.md) * [remoting](remoting.md) * [remoting-artery](remoting-artery.md) * [serialization](serialization.md) diff --git a/akka-docs/src/test/java/jdocs/cluster/ClusterDocTest.java b/akka-docs/src/test/java/jdocs/cluster/ClusterDocTest.java index cef9806af0..acb9a2ad7d 100644 --- a/akka-docs/src/test/java/jdocs/cluster/ClusterDocTest.java +++ b/akka-docs/src/test/java/jdocs/cluster/ClusterDocTest.java @@ -5,6 +5,7 @@ package jdocs.cluster; import akka.testkit.javadsl.TestKit; import com.typesafe.config.ConfigFactory; +import java.util.Set; import jdocs.AbstractJavaTest; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -12,6 +13,7 @@ import org.junit.Test; import akka.actor.ActorSystem; import akka.cluster.Cluster; +import akka.cluster.Member; public class ClusterDocTest extends AbstractJavaTest { @@ -38,5 +40,20 @@ public class ClusterDocTest extends AbstractJavaTest { //#leave } + + // compile only + @SuppressWarnings("unused") + public void demonstrateDataCenter() { + //#dcAccess + final Cluster cluster = Cluster.get(system); + // this node's data center + String dc = cluster.selfDataCenter(); + // all known data centers + Set allDc = cluster.state().getAllDataCenters(); + // a specific member's data center + Member aMember = cluster.state().getMembers().iterator().next(); + String aDc = aMember.dataCenter(); + //#dcAccess + } } diff --git a/akka-docs/src/test/scala/docs/cluster/ClusterDocSpec.scala b/akka-docs/src/test/scala/docs/cluster/ClusterDocSpec.scala index c5381cb149..78b07162b4 100644 --- a/akka-docs/src/test/scala/docs/cluster/ClusterDocSpec.scala +++ b/akka-docs/src/test/scala/docs/cluster/ClusterDocSpec.scala @@ -5,6 +5,7 @@ package scala.docs.cluster import akka.cluster.Cluster import akka.testkit.AkkaSpec +import docs.CompileOnlySpec object ClusterDocSpec { @@ -15,13 +16,28 @@ object ClusterDocSpec { """ } -class ClusterDocSpec extends AkkaSpec(ClusterDocSpec.config) { +class ClusterDocSpec extends AkkaSpec(ClusterDocSpec.config) with CompileOnlySpec { - "demonstrate leave" in { + "demonstrate leave" in compileOnlySpec { //#leave val cluster = Cluster(system) cluster.leave(cluster.selfAddress) //#leave } + "demonstrate data center" in compileOnlySpec { + { + //#dcAccess + val cluster = Cluster(system) + // this node's data center + val dc = cluster.selfDataCenter + // all known data centers + val allDc = cluster.state.allDataCenters + // a specific member's data center + val aMember = cluster.state.members.head + val aDc = aMember.dataCenter + //#dcAccess + } + } + }