diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/internal/ReplicatorBehavior.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/internal/ReplicatorBehavior.scala
index 3159d9cab6..0d21ef68ae 100644
--- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/internal/ReplicatorBehavior.scala
+++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/internal/ReplicatorBehavior.scala
@@ -33,7 +33,7 @@ import akka.actor.typed.Terminated
val localAskTimeout = 60.seconds // ReadLocal, WriteLocal shouldn't timeout
val additionalAskTimeout = 1.second
- def behavior(
+ def apply(
settings: dd.ReplicatorSettings,
underlyingReplicator: Option[akka.actor.ActorRef]): Behavior[SReplicator.Command] = {
diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/Replicator.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/Replicator.scala
index 6cf9a481c0..1944f4a146 100644
--- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/Replicator.scala
+++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/Replicator.scala
@@ -29,14 +29,14 @@ object Replicator {
* The `Behavior` for the `Replicator` actor.
*/
def behavior(settings: dd.ReplicatorSettings): Behavior[Command] =
- ReplicatorBehavior.behavior(settings, underlyingReplicator = None).narrow[Command]
+ ReplicatorBehavior(settings, underlyingReplicator = None).narrow[Command]
/**
* The `Behavior` for the `Replicator` actor.
* It will use the given underlying [[akka.cluster.ddata.Replicator]]
*/
def behavior(settings: dd.ReplicatorSettings, underlyingReplicator: akka.actor.ActorRef): Behavior[Command] =
- ReplicatorBehavior.behavior(settings, Some(underlyingReplicator)).narrow[Command]
+ ReplicatorBehavior(settings, Some(underlyingReplicator)).narrow[Command]
@DoNotInherit trait Command extends akka.cluster.ddata.typed.scaladsl.Replicator.Command
diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/Replicator.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/Replicator.scala
index 5bf497640c..acbee3f374 100644
--- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/Replicator.scala
+++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/Replicator.scala
@@ -20,14 +20,14 @@ object Replicator {
* The `Behavior` for the `Replicator` actor.
*/
def behavior(settings: ReplicatorSettings): Behavior[Command] =
- ReplicatorBehavior.behavior(settings, underlyingReplicator = None)
+ ReplicatorBehavior(settings, underlyingReplicator = None)
/**
* The `Behavior` for the `Replicator` actor.
* It will use the given underlying [[akka.cluster.ddata.Replicator]]
*/
def behavior(settings: ReplicatorSettings, underlyingReplicator: akka.actor.ActorRef): Behavior[Command] =
- ReplicatorBehavior.behavior(settings, Some(underlyingReplicator))
+ ReplicatorBehavior(settings, Some(underlyingReplicator))
type ReadConsistency = dd.Replicator.ReadConsistency
val ReadLocal = dd.Replicator.ReadLocal
diff --git a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocSample.java b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocSample.java
index ba416cc5b7..87360f9a58 100644
--- a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocSample.java
+++ b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocSample.java
@@ -97,6 +97,10 @@ interface ReplicatorDocSample {
this.replicatorAdapter = replicatorAdapter;
this.key = key;
+ // #selfUniqueAddress
+ final SelfUniqueAddress node = DistributedData.get(context.getSystem()).selfUniqueAddress();
+ // #selfUniqueAddress
+
this.node = DistributedData.get(context.getSystem()).selfUniqueAddress();
// #subscribe
diff --git a/akka-cluster-typed/src/test/scala/docs/akka/cluster/ddata/typed/scaladsl/ReplicatorDocSpec.scala b/akka-cluster-typed/src/test/scala/docs/akka/cluster/ddata/typed/scaladsl/ReplicatorDocSpec.scala
index f90c882547..ef0bfb72d8 100644
--- a/akka-cluster-typed/src/test/scala/docs/akka/cluster/ddata/typed/scaladsl/ReplicatorDocSpec.scala
+++ b/akka-cluster-typed/src/test/scala/docs/akka/cluster/ddata/typed/scaladsl/ReplicatorDocSpec.scala
@@ -44,8 +44,10 @@ object ReplicatorDocSpec {
private case class InternalSubscribeResponse(chg: Replicator.SubscribeResponse[GCounter]) extends InternalCommand
def apply(key: GCounterKey): Behavior[Command] =
- Behaviors.setup[Command] { ctx =>
- implicit val node: SelfUniqueAddress = DistributedData(ctx.system).selfUniqueAddress
+ Behaviors.setup[Command] { context =>
+ //#selfUniqueAddress
+ implicit val node: SelfUniqueAddress = DistributedData(context.system).selfUniqueAddress
+ //#selfUniqueAddress
// adapter that turns the response messages from the replicator into our own protocol
DistributedData.withReplicatorMessageAdapter[Command, GCounter] { replicatorAdapter =>
diff --git a/akka-docs/src/main/paradox/distributed-data.md b/akka-docs/src/main/paradox/distributed-data.md
index 8242afdd17..56470e58a0 100644
--- a/akka-docs/src/main/paradox/distributed-data.md
+++ b/akka-docs/src/main/paradox/distributed-data.md
@@ -36,10 +36,10 @@ that it is given the same name, started on same path, on all nodes.
Cluster members with status @ref:[WeaklyUp](typed/cluster-membership.md#weakly-up),
will participate in Distributed Data. This means that the data will be replicated to the
-@ref:[WeaklyUp](typed/cluster-membership.md#weakly-up) nodes with the background gossip protocol. Note that it
+`WeaklyUp` nodes with the background gossip protocol. Note that it
will not participate in any actions where the consistency mode is to read/write from all
-nodes or the majority of nodes. The @ref:[WeaklyUp](typed/cluster-membership.md#weakly-up) node is not counted
-as part of the cluster. So 3 nodes + 5 @ref:[WeaklyUp](typed/cluster-membership.md#weakly-up) is essentially a
+nodes or the majority of nodes. The `WeaklyUp` node is not counted
+as part of the cluster. So 3 nodes + 5 `WeaklyUp` is essentially a
3 node cluster as far as consistent actions are concerned.
Below is an example of an actor that schedules tick messages to itself and for each tick
@@ -246,90 +246,25 @@ types that support both updates and removals, for example `ORMap` or `ORSet`.
@@@
-### Delta-CRDT
-
-For the full documentation of this feature and for new projects see @ref:[Distributed Data Delta CRDT](typed/distributed-data.md#delta-crdt).
-
## Replicated data types
Akka contains a set of useful replicated data types and it is fully possible to implement custom replicated data types.
For the full documentation of this feature and for new projects see @ref:[Distributed Data Replicated data types](typed/distributed-data.md#replicated-data-types).
+### Delta-CRDT
+
+For the full documentation of this feature and for new projects see @ref:[Distributed Data Delta CRDT](typed/distributed-data.md#delta-crdt).
+
### Custom Data Type
You can implement your own data types.
For the full documentation of this feature and for new projects see @ref:[Distributed Data custom data type](typed/distributed-data.md#custom-data-type).
-#### Serialization
-
-The data types must be serializable with an @ref:[Akka Serializer](serialization.md).
-It is highly recommended that you implement efficient serialization with Protobuf or similar
-for your custom data types. The built in data types are marked with `ReplicatedDataSerialization`
-and serialized with `akka.cluster.ddata.protobuf.ReplicatedDataSerializer`.
-
-Serialization of the data types are used in remote messages and also for creating message
-digests (SHA-1) to detect changes. Therefore it is important that the serialization is efficient
-and produce the same bytes for the same content. For example sets and maps should be sorted
-deterministically in the serialization.
-
-This is a protobuf representation of the above `TwoPhaseSet`:
-
-@@snip [TwoPhaseSetMessages.proto](/akka-docs/src/test/../main/protobuf/TwoPhaseSetMessages.proto) { #twophaseset }
-
-The serializer for the `TwoPhaseSet`:
-
-Scala
-: @@snip [TwoPhaseSetSerializer.scala](/akka-docs/src/test/scala/docs/ddata/protobuf/TwoPhaseSetSerializer.scala) { #serializer }
-
-Java
-: @@snip [TwoPhaseSetSerializer.java](/akka-docs/src/test/java/jdocs/ddata/protobuf/TwoPhaseSetSerializer.java) { #serializer }
-
-Note that the elements of the sets are sorted so the SHA-1 digests are the same
-for the same elements.
-
-You register the serializer in configuration:
-
-Scala
-: @@snip [DistributedDataDocSpec.scala](/akka-docs/src/test/scala/docs/ddata/DistributedDataDocSpec.scala) { #serializer-config }
-
-Java
-: @@snip [DistributedDataDocSpec.scala](/akka-docs/src/test/scala/docs/ddata/DistributedDataDocSpec.scala) { #japi-serializer-config }
-
-Using compression can sometimes be a good idea to reduce the data size. Gzip compression is
-provided by the @scala[`akka.cluster.ddata.protobuf.SerializationSupport` trait]@java[`akka.cluster.ddata.protobuf.AbstractSerializationSupport` interface]:
-
-Scala
-: @@snip [TwoPhaseSetSerializer.scala](/akka-docs/src/test/scala/docs/ddata/protobuf/TwoPhaseSetSerializer.scala) { #compression }
-
-Java
-: @@snip [TwoPhaseSetSerializerWithCompression.java](/akka-docs/src/test/java/jdocs/ddata/protobuf/TwoPhaseSetSerializerWithCompression.java) { #compression }
-
-The two embedded `GSet` can be serialized as illustrated above, but in general when composing
-new data types from the existing built in types it is better to make use of the existing
-serializer for those types. This can be done by declaring those as bytes fields in protobuf:
-
-@@snip [TwoPhaseSetMessages.proto](/akka-docs/src/test/../main/protobuf/TwoPhaseSetMessages.proto) { #twophaseset2 }
-
-and use the methods `otherMessageToProto` and `otherMessageFromBinary` that are provided
-by the `SerializationSupport` trait to serialize and deserialize the `GSet` instances. This
-works with any type that has a registered Akka serializer. This is how such an serializer would
-look like for the `TwoPhaseSet`:
-
-Scala
-: @@snip [TwoPhaseSetSerializer2.scala](/akka-docs/src/test/scala/docs/ddata/protobuf/TwoPhaseSetSerializer2.scala) { #serializer }
-
-Java
-: @@snip [TwoPhaseSetSerializer2.java](/akka-docs/src/test/java/jdocs/ddata/protobuf/TwoPhaseSetSerializer2.java) { #serializer }
-
-### Durable Storage
+## Durable Storage
For the full documentation of this feature and for new projects see @ref:[Durable Storage](typed/distributed-data.md#durable-storage).
-### CRDT Garbage
-
-For the full documentation of this feature and for new projects see @ref:[CRDT Garbage](typed/distributed-data.md#crdt-garbage).
-
## Samples
Several interesting samples are included and described in the
diff --git a/akka-docs/src/main/paradox/typed/distributed-data.md b/akka-docs/src/main/paradox/typed/distributed-data.md
index 360eaf919e..8f4e67ccad 100644
--- a/akka-docs/src/main/paradox/typed/distributed-data.md
+++ b/akka-docs/src/main/paradox/typed/distributed-data.md
@@ -11,7 +11,7 @@ For the Akka Classic documentation of this feature see @ref:[Classic Distributed
## Dependency
-To use Akka Cluster Distributed Data Typed, you must add the following dependency in your project:
+To use Akka Cluster Distributed Data, you must add the following dependency in your project:
@@dependency[sbt,Maven,Gradle] {
group=com.typesafe.akka
@@ -30,7 +30,7 @@ All data entries are spread to all nodes, or nodes with a certain role, in the c
via direct replication and gossip based dissemination. You have fine grained control
of the consistency level for reads and writes.
-The nature CRDTs makes it possible to perform updates from any node without coordination.
+The nature of CRDTs makes it possible to perform updates from any node without coordination.
Concurrent updates from different nodes will automatically be resolved by the monotonic
merge function, which all data types must provide. The state changes always converge.
Several useful data types for counters, sets, maps and registers are provided and
@@ -46,15 +46,30 @@ The @apidoc[typed.*.Replicator]
actor provides the API for interacting with the data and is accessed through the extension
@apidoc[typed.*.DistributedData].
-The messages for the replicator, such as `Replicator.Update` are defined in @apidoc[typed.*.Replicator]
-but the actual CRDTs are the
-same as in classic, for example `akka.cluster.ddata.GCounter`. This will require a @scala[implicit] `akka.cluster.ddata.SelfUniqueAddress.SelfUniqueAddress`,
-available from @scala[`implicit val node = DistributedData(system).selfUniqueAddress`]@java[SelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress();].
+The messages for the replicator, such as `Replicator.Update` are defined in
+@scala[`akka.cluster.ddata.typed.scaladsl.Replicator`]@java[`akka.cluster.ddata.typed.javaadsl.Replicator`]
+and the actual CRDTs are defined in the `akka.cluster.ddata` package, for example
+@apidoc[akka.cluster.ddata.GCounter]. It requires a @scala[implicit] `akka.cluster.ddata.SelfUniqueAddress`,
+available from:
+Scala
+: @@snip [ReplicatorSpec.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/ddata/typed/scaladsl/ReplicatorDocSpec.scala) { #selfUniqueAddress }
+
+Java
+: @@snip [ReplicatorTest.java](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocSample.java) { #selfUniqueAddress }
+
The replicator can contain multiple entries each containing a replicated data type, we therefore need to create a
key identifying the entry and helping us know what type it has, and then use that key for every interaction with
the replicator. Each replicated data type contains a factory for defining such a key.
+Cluster members with status @ref:[WeaklyUp](cluster-membership.md#weakly-up),
+will participate in Distributed Data. This means that the data will be replicated to the
+`WeaklyUp` nodes with the background gossip protocol. Note that it
+will not participate in any actions where the consistency mode is to read/write from all
+nodes or the majority of nodes. The `WeaklyUp` node is not counted
+as part of the cluster. So 3 nodes + 5 `WeaklyUp` is essentially a
+3 node cluster as far as consistent actions are concerned.
+
This sample uses the replicated data type `GCounter` to implement a counter that can be written to on any node of the
cluster:
@@ -70,21 +85,19 @@ often more convenient to use the `ReplicatorMessageAdapter` as in the above exam
### Update
-
-For an incoming `Increment` command, we send the `replicator` a `Replicator.Update` request, it contains five values:
+
+To modify and replicate a data value you send a `Replicator.Update` message to the local
+`Replicator`.
+
+In the above example, for an incoming `Increment` command, we send the `replicator` a `Replicator.Update` request,
+it contains five values:
1. the @scala[`Key`]@java[`KEY`] we want to update
- 1. the data to use if as the empty state if the replicator has not seen the key before
+ 1. the data to use as the empty state if the replicator has not seen the key before
1. the @ref:[write consistency level](#write-consistency) we want for the update
1. an @scala[`ActorRef[Replicator.UpdateResponse[GCounter]]`]@java[`ActorRef>`]
to respond to when the update is completed
- 1. a function that takes a previous state and updates it, in our case by incrementing it with 1
-
-Whenever the distributed counter is updated, we cache the value so that we can answer requests about the value without
-the extra interaction with the replicator using the `GetCachedValue` command.
-
-The example also supports asking the replicator using the `GetValue` command. Note how the `replyTo` from the
-incoming message can be used when the `GetSuccess` response from the replicator is received.
+ 1. a `modify` function that takes a previous state and updates it, in our case by incrementing it with 1
@@@ div { .group-scala }
There is alternative way of constructing the function for the `Update` message:
@@ -94,11 +107,21 @@ Scala
@@@
+The current data value for the `key` of the `Update` is passed as parameter to the `modify`
+function of the `Update`. The function is supposed to return the new value of the data, which
+will then be replicated according to the given @ref:[write consistency level](#write-consistency).
+
+The `modify` function is called by the `Replicator` actor and must therefore be a pure
+function that only uses the data parameter and stable fields from enclosing scope. It must
+for example not access the `ActorContext` or mutable state of an enclosing actor.
+`Update` is intended to only be sent from an actor running in same local `ActorSystem`
+ as the `Replicator`, because the `modify` function is typically not serializable.
+
You will always see your own writes. For example if you send two `Update` messages
changing the value of the same `key`, the `modify` function of the second message will
see the change that was performed by the first `Update` message.
-As reply of the `Update` a `Replicator.UpdateSuccess` is sent to the sender of the
+As reply of the `Update` a `Replicator.UpdateSuccess` is sent to the `replyTo` of the
`Update` if the value was successfully replicated according to the supplied consistency
level within the supplied timeout. Otherwise a `Replicator.UpdateFailure` subclass is
sent back. Note that a `Replicator.UpdateTimeout` reply does not mean that the update completely failed
@@ -108,7 +131,10 @@ be replicated to all nodes with the gossip protocol.
### Get
To retrieve the current value of a data you send `Replicator.Get` message to the
-`Replicator`. You supply a consistency level which has the following meaning:
+`Replicator`.
+
+The example has the `GetValue` command, which is asking the replicator for current value. Note how the `replyTo` from the
+incoming message can be used when the `GetSuccess` response from the replicator is received.
@@@ div { .group-scala }
Alternative way of constructing the function for the `Get` and `Delete`:
@@ -119,22 +145,26 @@ Scala
@@@
For a `Get` you supply a @ref:[read consistency level](#read-consistency).
+
You will always read your own writes. For example if you send a `Update` message
followed by a `Get` of the same `key` the `Get` will retrieve the change that was
performed by the preceding `Update` message. However, the order of the reply messages are
not defined, i.e. in the previous example you may receive the `GetSuccess` before
the `UpdateSuccess`.
-As reply of the `Get` a `Replicator.GetSuccess` is sent to the sender of the
+As reply of the `Get` a `Replicator.GetSuccess` is sent to the `replyTo` of the
`Get` if the value was successfully retrieved according to the supplied consistency
level within the supplied timeout. Otherwise a `Replicator.GetFailure` is sent.
If the key does not exist the reply will be `Replicator.NotFound`.
### Subscribe
+Whenever the distributed counter in the example is updated, we cache the value so that we can answer
+requests about the value without the extra interaction with the replicator using the `GetCachedValue` command.
+
When we start up the actor we subscribe it to changes for our key, meaning whenever the replicator observes a change
for the counter our actor will receive a @scala[`Replicator.Changed[GCounter]`]@java[`Replicator.Changed`]. Since
-this is not a message in our protocol, we use a message transformation function to wrap it in the internal `InternalChanged`
+this is not a message in our protocol, we use a message transformation function to wrap it in the internal `InternalSubscribeResponse`
message, which is then handled in the regular message handling of the behavior, as shown in the above example.
Subscribers will be notified of changes, if there are any, based on the
configurable `akka.cluster.distributed-data.notify-subscribers-interval`.
@@ -146,7 +176,7 @@ also be de-registered with the `replicatorAdapter.unsubscribe(key)` function.
A data entry can be deleted by sending a `Replicator.Delete` message to the local
`Replicator`. As reply of the `Delete` a `Replicator.DeleteSuccess` is sent to
-the sender of the `Delete` if the value was successfully deleted according to the supplied
+the `replyTo` of the `Delete` if the value was successfully deleted according to the supplied
consistency level within the supplied timeout. Otherwise a `Replicator.ReplicationDeleteFailure`
is sent. Note that `ReplicationDeleteFailure` does not mean that the delete completely failed or
was rolled back. It may still have been replicated to some nodes, and may eventually be replicated
@@ -156,10 +186,6 @@ A deleted key cannot be reused again, but it is still recommended to delete unus
data entries because that reduces the replication overhead when new nodes join the cluster.
Subsequent `Delete`, `Update` and `Get` requests will be replied with `Replicator.DataDeleted`.
Subscribers will receive `Replicator.Deleted`.
-
-In the *Delete* message you can pass an optional request context in the same way as for the
-*Update* message, described above. For example the original sender can be passed and replied
-to after receiving and transforming *DeleteSuccess*.
@@@ warning
@@ -171,32 +197,6 @@ types that support both updates and removals, for example `ORMap` or `ORSet`.
@@@
-### Delta-CRDT
-
-[Delta State Replicated Data Types](http://arxiv.org/abs/1603.01529)
-are supported. A delta-CRDT is a way to reduce the need for sending the full state
-for updates. For example adding element `'c'` and `'d'` to set `{'a', 'b'}` would
-result in sending the delta `{'c', 'd'}` and merge that with the state on the
-receiving side, resulting in set `{'a', 'b', 'c', 'd'}`.
-
-The protocol for replicating the deltas supports causal consistency if the data type
-is marked with `RequiresCausalDeliveryOfDeltas`. Otherwise it is only eventually
-consistent. Without causal consistency it means that if elements `'c'` and `'d'` are
-added in two separate *Update* operations these deltas may occasionally be propagated
-to nodes in a different order to the causal order of the updates. For this example it
-can result in that set `{'a', 'b', 'd'}` can be seen before element 'c' is seen. Eventually
-it will be `{'a', 'b', 'c', 'd'}`.
-
-Note that the full state is occasionally also replicated for delta-CRDTs, for example when
-new nodes are added to the cluster or when deltas could not be propagated because
-of network partitions or similar problems.
-
-The the delta propagation can be disabled with configuration property:
-
-```
-akka.cluster.distributed-data.delta-crdt.enabled=off
-```
-
### Consistency
The consistency level that is supplied in the @ref:[Update](#update) and @ref:[Get](#get)
@@ -308,6 +308,17 @@ happens to be n4, n5, n6, n7, i.e. the value on n1, n2, n3 is not seen in the re
@@@
+### Running separate instances of the replicator
+
+For some use cases, for example when limiting the replicator to certain roles, or using different subsets on different roles,
+it makes sense to start separate replicators, this needs to be done on all nodes, or
+the group of nodes tagged with a specific role. To do this with Distributed Data you will first
+have to start a classic `Replicator` and pass it to the `Replicator.behavior` method that takes a classic
+actor ref. All such `Replicator`s must run on the same path in the classic actor hierarchy.
+
+A standalone `ReplicatorMessageAdapter` can also be created for a given `Replicator` instead of creating
+one via the `DistributedData` extension.
+
## Replicated data types
Akka contains a set of useful replicated data types and it is fully possible to implement custom replicated data types.
@@ -505,6 +516,32 @@ synchronized clocks when there is only one active writer, e.g. a Cluster Singlet
single writer should then first read current value with `ReadMajority` (or more) before
changing and writing the value with `WriteMajority` (or more).
+### Delta-CRDT
+
+[Delta State Replicated Data Types](http://arxiv.org/abs/1603.01529)
+are supported. A delta-CRDT is a way to reduce the need for sending the full state
+for updates. For example adding element `'c'` and `'d'` to set `{'a', 'b'}` would
+result in sending the delta `{'c', 'd'}` and merge that with the state on the
+receiving side, resulting in set `{'a', 'b', 'c', 'd'}`.
+
+The protocol for replicating the deltas supports causal consistency if the data type
+is marked with `RequiresCausalDeliveryOfDeltas`. Otherwise it is only eventually
+consistent. Without causal consistency it means that if elements `'c'` and `'d'` are
+added in two separate *Update* operations these deltas may occasionally be propagated
+to nodes in a different order to the causal order of the updates. For this example it
+can result in that set `{'a', 'b', 'd'}` can be seen before element 'c' is seen. Eventually
+it will be `{'a', 'b', 'c', 'd'}`.
+
+Note that the full state is occasionally also replicated for delta-CRDTs, for example when
+new nodes are added to the cluster or when deltas could not be propagated because
+of network partitions or similar problems.
+
+The the delta propagation can be disabled with configuration property:
+
+```
+akka.cluster.distributed-data.delta-crdt.enabled=off
+```
+
### Custom Data Type
You can implement your own data types. The only requirement is that it implements
@@ -544,6 +581,16 @@ This is a protobuf representation of the above `TwoPhaseSet`:
@@snip [TwoPhaseSetMessages.proto](/akka-docs/src/test/../main/protobuf/TwoPhaseSetMessages.proto) { #twophaseset }
+The serializer for the `TwoPhaseSet`:
+
+Scala
+: @@snip [TwoPhaseSetSerializer.scala](/akka-docs/src/test/scala/docs/ddata/protobuf/TwoPhaseSetSerializer.scala) { #serializer }
+
+Java
+: @@snip [TwoPhaseSetSerializer.java](/akka-docs/src/test/java/jdocs/ddata/protobuf/TwoPhaseSetSerializer.java) { #serializer }
+
+Note that the elements of the sets are sorted so the SHA-1 digests are the same for the same elements.
+
You register the serializer in configuration:
Scala
@@ -552,21 +599,34 @@ Scala
Java
: @@snip [DistributedDataDocSpec.scala](/akka-docs/src/test/scala/docs/ddata/DistributedDataDocSpec.scala) { #japi-serializer-config }
-For a serialization example and how to use with compression,
-see @ref[](../distributed-data.md#serialization) in the documentation of the Akka Classic APIs.
+Using compression can sometimes be a good idea to reduce the data size. Gzip compression is
+provided by the @scala[`akka.cluster.ddata.protobuf.SerializationSupport` trait]@java[`akka.cluster.ddata.protobuf.AbstractSerializationSupport` interface]:
-### Running separate instances of the replicator
+Scala
+: @@snip [TwoPhaseSetSerializer.scala](/akka-docs/src/test/scala/docs/ddata/protobuf/TwoPhaseSetSerializer.scala) { #compression }
-For some use cases, for example when limiting the replicator to certain roles, or using different subsets on different roles,
-it makes sense to start separate replicators, this needs to be done on all nodes, or
-the group of nodes tagged with a specific role. To do this with Distributed Data you will first
-have to start a classic `Replicator` and pass it to the `Replicator.behavior` method that takes a classic
-actor ref. All such `Replicator`s must run on the same path in the classic actor hierarchy.
+Java
+: @@snip [TwoPhaseSetSerializerWithCompression.java](/akka-docs/src/test/java/jdocs/ddata/protobuf/TwoPhaseSetSerializerWithCompression.java) { #compression }
-A standalone `ReplicatorMessageAdapter` can also be created for a given `Replicator` instead of creating
-one via the `DistributedData` extension.
+The two embedded `GSet` can be serialized as illustrated above, but in general when composing
+new data types from the existing built in types it is better to make use of the existing
+serializer for those types. This can be done by declaring those as bytes fields in protobuf:
-### Durable Storage
+@@snip [TwoPhaseSetMessages.proto](/akka-docs/src/test/../main/protobuf/TwoPhaseSetMessages.proto) { #twophaseset2 }
+
+and use the methods `otherMessageToProto` and `otherMessageFromBinary` that are provided
+by the `SerializationSupport` trait to serialize and deserialize the `GSet` instances. This
+works with any type that has a registered Akka serializer. This is how such an serializer would
+look like for the `TwoPhaseSet`:
+
+Scala
+: @@snip [TwoPhaseSetSerializer2.scala](/akka-docs/src/test/scala/docs/ddata/protobuf/TwoPhaseSetSerializer2.scala) { #serializer }
+
+Java
+: @@snip [TwoPhaseSetSerializer2.java](/akka-docs/src/test/java/jdocs/ddata/protobuf/TwoPhaseSetSerializer2.java) { #serializer }
+
+
+## Durable Storage
By default the data is only kept in memory. It is redundant since it is replicated to other nodes
in the cluster, but if you stop all nodes the data is lost, unless you have saved it
@@ -651,16 +711,6 @@ This would be possible if a node with durable data didn't participate in the pru
be stopped for longer time than this duration and if it is joining again after this
duration its data should first be manually removed (from the lmdb directory).
-### CRDT Garbage
-
-One thing that can be problematic with CRDTs is that some data types accumulate history (garbage).
-For example a `GCounter` keeps track of one counter per node. If a `GCounter` has been updated
-from one node it will associate the identifier of that node forever. That can become a problem
-for long running systems with many cluster nodes being added and removed. To solve this problem
-the `Replicator` performs pruning of data associated with nodes that have been removed from the
-cluster. Data types that need pruning have to implement the `RemovedNodePruning` trait. See the
-API documentation of the `Replicator` for details.
-
## Limitations
There are some limitations that you should be aware of.
@@ -683,6 +733,16 @@ for example when new nodes are added to the cluster or when deltas could not be
of network partitions or similar problems. This means that you cannot have too large
data entries, because then the remote message size will be too large.
+### CRDT Garbage
+
+One thing that can be problematic with CRDTs is that some data types accumulate history (garbage).
+For example a `GCounter` keeps track of one counter per node. If a `GCounter` has been updated
+from one node it will associate the identifier of that node forever. That can become a problem
+for long running systems with many cluster nodes being added and removed. To solve this problem
+the `Replicator` performs pruning of data associated with nodes that have been removed from the
+cluster. Data types that need pruning have to implement the `RemovedNodePruning` trait. See the
+API documentation of the `Replicator` for details.
+
## Learn More about CRDTs
* [Eventually Consistent Data Structures](https://vimeo.com/43903960)
diff --git a/akka-docs/src/test/java/jdocs/ddata/DistributedDataDocTest.java b/akka-docs/src/test/java/jdocs/ddata/DistributedDataDocTest.java
index bf0527b2f3..02fcbd7157 100644
--- a/akka-docs/src/test/java/jdocs/ddata/DistributedDataDocTest.java
+++ b/akka-docs/src/test/java/jdocs/ddata/DistributedDataDocTest.java
@@ -387,7 +387,7 @@ public class DistributedDataDocTest extends AbstractJavaTest {
public void demonstrateORSet() {
// #orset
- final Cluster node = Cluster.get(system);
+ final SelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress();
final ORSet s0 = ORSet.create();
final ORSet s1 = s0.add(node, "a");
final ORSet s2 = s1.add(node, "b");