doc: improvements to Distributed Data docs (#27971)

* doc: improvements to Distributed Data docs

* leftover

* java formatting
This commit is contained in:
Patrik Nordwall 2019-10-14 14:03:04 +02:00 committed by GitHub
parent b5400975e5
commit 78281ba92f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 154 additions and 153 deletions

View file

@ -33,7 +33,7 @@ import akka.actor.typed.Terminated
val localAskTimeout = 60.seconds // ReadLocal, WriteLocal shouldn't timeout val localAskTimeout = 60.seconds // ReadLocal, WriteLocal shouldn't timeout
val additionalAskTimeout = 1.second val additionalAskTimeout = 1.second
def behavior( def apply(
settings: dd.ReplicatorSettings, settings: dd.ReplicatorSettings,
underlyingReplicator: Option[akka.actor.ActorRef]): Behavior[SReplicator.Command] = { underlyingReplicator: Option[akka.actor.ActorRef]): Behavior[SReplicator.Command] = {

View file

@ -29,14 +29,14 @@ object Replicator {
* The `Behavior` for the `Replicator` actor. * The `Behavior` for the `Replicator` actor.
*/ */
def behavior(settings: dd.ReplicatorSettings): Behavior[Command] = def behavior(settings: dd.ReplicatorSettings): Behavior[Command] =
ReplicatorBehavior.behavior(settings, underlyingReplicator = None).narrow[Command] ReplicatorBehavior(settings, underlyingReplicator = None).narrow[Command]
/** /**
* The `Behavior` for the `Replicator` actor. * The `Behavior` for the `Replicator` actor.
* It will use the given underlying [[akka.cluster.ddata.Replicator]] * It will use the given underlying [[akka.cluster.ddata.Replicator]]
*/ */
def behavior(settings: dd.ReplicatorSettings, underlyingReplicator: akka.actor.ActorRef): Behavior[Command] = def behavior(settings: dd.ReplicatorSettings, underlyingReplicator: akka.actor.ActorRef): Behavior[Command] =
ReplicatorBehavior.behavior(settings, Some(underlyingReplicator)).narrow[Command] ReplicatorBehavior(settings, Some(underlyingReplicator)).narrow[Command]
@DoNotInherit trait Command extends akka.cluster.ddata.typed.scaladsl.Replicator.Command @DoNotInherit trait Command extends akka.cluster.ddata.typed.scaladsl.Replicator.Command

View file

@ -20,14 +20,14 @@ object Replicator {
* The `Behavior` for the `Replicator` actor. * The `Behavior` for the `Replicator` actor.
*/ */
def behavior(settings: ReplicatorSettings): Behavior[Command] = def behavior(settings: ReplicatorSettings): Behavior[Command] =
ReplicatorBehavior.behavior(settings, underlyingReplicator = None) ReplicatorBehavior(settings, underlyingReplicator = None)
/** /**
* The `Behavior` for the `Replicator` actor. * The `Behavior` for the `Replicator` actor.
* It will use the given underlying [[akka.cluster.ddata.Replicator]] * It will use the given underlying [[akka.cluster.ddata.Replicator]]
*/ */
def behavior(settings: ReplicatorSettings, underlyingReplicator: akka.actor.ActorRef): Behavior[Command] = def behavior(settings: ReplicatorSettings, underlyingReplicator: akka.actor.ActorRef): Behavior[Command] =
ReplicatorBehavior.behavior(settings, Some(underlyingReplicator)) ReplicatorBehavior(settings, Some(underlyingReplicator))
type ReadConsistency = dd.Replicator.ReadConsistency type ReadConsistency = dd.Replicator.ReadConsistency
val ReadLocal = dd.Replicator.ReadLocal val ReadLocal = dd.Replicator.ReadLocal

View file

@ -97,6 +97,10 @@ interface ReplicatorDocSample {
this.replicatorAdapter = replicatorAdapter; this.replicatorAdapter = replicatorAdapter;
this.key = key; this.key = key;
// #selfUniqueAddress
final SelfUniqueAddress node = DistributedData.get(context.getSystem()).selfUniqueAddress();
// #selfUniqueAddress
this.node = DistributedData.get(context.getSystem()).selfUniqueAddress(); this.node = DistributedData.get(context.getSystem()).selfUniqueAddress();
// #subscribe // #subscribe

View file

@ -44,8 +44,10 @@ object ReplicatorDocSpec {
private case class InternalSubscribeResponse(chg: Replicator.SubscribeResponse[GCounter]) extends InternalCommand private case class InternalSubscribeResponse(chg: Replicator.SubscribeResponse[GCounter]) extends InternalCommand
def apply(key: GCounterKey): Behavior[Command] = def apply(key: GCounterKey): Behavior[Command] =
Behaviors.setup[Command] { ctx => Behaviors.setup[Command] { context =>
implicit val node: SelfUniqueAddress = DistributedData(ctx.system).selfUniqueAddress //#selfUniqueAddress
implicit val node: SelfUniqueAddress = DistributedData(context.system).selfUniqueAddress
//#selfUniqueAddress
// adapter that turns the response messages from the replicator into our own protocol // adapter that turns the response messages from the replicator into our own protocol
DistributedData.withReplicatorMessageAdapter[Command, GCounter] { replicatorAdapter => DistributedData.withReplicatorMessageAdapter[Command, GCounter] { replicatorAdapter =>

View file

@ -36,10 +36,10 @@ that it is given the same name, started on same path, on all nodes.
Cluster members with status @ref:[WeaklyUp](typed/cluster-membership.md#weakly-up), Cluster members with status @ref:[WeaklyUp](typed/cluster-membership.md#weakly-up),
will participate in Distributed Data. This means that the data will be replicated to the will participate in Distributed Data. This means that the data will be replicated to the
@ref:[WeaklyUp](typed/cluster-membership.md#weakly-up) nodes with the background gossip protocol. Note that it `WeaklyUp` nodes with the background gossip protocol. Note that it
will not participate in any actions where the consistency mode is to read/write from all will not participate in any actions where the consistency mode is to read/write from all
nodes or the majority of nodes. The @ref:[WeaklyUp](typed/cluster-membership.md#weakly-up) node is not counted nodes or the majority of nodes. The `WeaklyUp` node is not counted
as part of the cluster. So 3 nodes + 5 @ref:[WeaklyUp](typed/cluster-membership.md#weakly-up) is essentially a as part of the cluster. So 3 nodes + 5 `WeaklyUp` is essentially a
3 node cluster as far as consistent actions are concerned. 3 node cluster as far as consistent actions are concerned.
Below is an example of an actor that schedules tick messages to itself and for each tick Below is an example of an actor that schedules tick messages to itself and for each tick
@ -246,90 +246,25 @@ types that support both updates and removals, for example `ORMap` or `ORSet`.
@@@ @@@
### Delta-CRDT
For the full documentation of this feature and for new projects see @ref:[Distributed Data Delta CRDT](typed/distributed-data.md#delta-crdt).
## Replicated data types ## Replicated data types
Akka contains a set of useful replicated data types and it is fully possible to implement custom replicated data types. Akka contains a set of useful replicated data types and it is fully possible to implement custom replicated data types.
For the full documentation of this feature and for new projects see @ref:[Distributed Data Replicated data types](typed/distributed-data.md#replicated-data-types). For the full documentation of this feature and for new projects see @ref:[Distributed Data Replicated data types](typed/distributed-data.md#replicated-data-types).
### Delta-CRDT
For the full documentation of this feature and for new projects see @ref:[Distributed Data Delta CRDT](typed/distributed-data.md#delta-crdt).
### Custom Data Type ### Custom Data Type
You can implement your own data types. You can implement your own data types.
For the full documentation of this feature and for new projects see @ref:[Distributed Data custom data type](typed/distributed-data.md#custom-data-type). For the full documentation of this feature and for new projects see @ref:[Distributed Data custom data type](typed/distributed-data.md#custom-data-type).
#### Serialization
The data types must be serializable with an @ref:[Akka Serializer](serialization.md).
It is highly recommended that you implement efficient serialization with Protobuf or similar
for your custom data types. The built in data types are marked with `ReplicatedDataSerialization`
and serialized with `akka.cluster.ddata.protobuf.ReplicatedDataSerializer`.
Serialization of the data types are used in remote messages and also for creating message
digests (SHA-1) to detect changes. Therefore it is important that the serialization is efficient
and produce the same bytes for the same content. For example sets and maps should be sorted
deterministically in the serialization.
This is a protobuf representation of the above `TwoPhaseSet`:
@@snip [TwoPhaseSetMessages.proto](/akka-docs/src/test/../main/protobuf/TwoPhaseSetMessages.proto) { #twophaseset }
The serializer for the `TwoPhaseSet`:
Scala
: @@snip [TwoPhaseSetSerializer.scala](/akka-docs/src/test/scala/docs/ddata/protobuf/TwoPhaseSetSerializer.scala) { #serializer }
Java
: @@snip [TwoPhaseSetSerializer.java](/akka-docs/src/test/java/jdocs/ddata/protobuf/TwoPhaseSetSerializer.java) { #serializer }
Note that the elements of the sets are sorted so the SHA-1 digests are the same
for the same elements.
You register the serializer in configuration:
Scala
: @@snip [DistributedDataDocSpec.scala](/akka-docs/src/test/scala/docs/ddata/DistributedDataDocSpec.scala) { #serializer-config }
Java
: @@snip [DistributedDataDocSpec.scala](/akka-docs/src/test/scala/docs/ddata/DistributedDataDocSpec.scala) { #japi-serializer-config }
Using compression can sometimes be a good idea to reduce the data size. Gzip compression is
provided by the @scala[`akka.cluster.ddata.protobuf.SerializationSupport` trait]@java[`akka.cluster.ddata.protobuf.AbstractSerializationSupport` interface]:
Scala
: @@snip [TwoPhaseSetSerializer.scala](/akka-docs/src/test/scala/docs/ddata/protobuf/TwoPhaseSetSerializer.scala) { #compression }
Java
: @@snip [TwoPhaseSetSerializerWithCompression.java](/akka-docs/src/test/java/jdocs/ddata/protobuf/TwoPhaseSetSerializerWithCompression.java) { #compression }
The two embedded `GSet` can be serialized as illustrated above, but in general when composing
new data types from the existing built in types it is better to make use of the existing
serializer for those types. This can be done by declaring those as bytes fields in protobuf:
@@snip [TwoPhaseSetMessages.proto](/akka-docs/src/test/../main/protobuf/TwoPhaseSetMessages.proto) { #twophaseset2 }
and use the methods `otherMessageToProto` and `otherMessageFromBinary` that are provided
by the `SerializationSupport` trait to serialize and deserialize the `GSet` instances. This
works with any type that has a registered Akka serializer. This is how such an serializer would
look like for the `TwoPhaseSet`:
Scala
: @@snip [TwoPhaseSetSerializer2.scala](/akka-docs/src/test/scala/docs/ddata/protobuf/TwoPhaseSetSerializer2.scala) { #serializer }
Java
: @@snip [TwoPhaseSetSerializer2.java](/akka-docs/src/test/java/jdocs/ddata/protobuf/TwoPhaseSetSerializer2.java) { #serializer }
<a id="ddata-durable"></a> <a id="ddata-durable"></a>
### Durable Storage ## Durable Storage
For the full documentation of this feature and for new projects see @ref:[Durable Storage](typed/distributed-data.md#durable-storage). For the full documentation of this feature and for new projects see @ref:[Durable Storage](typed/distributed-data.md#durable-storage).
### CRDT Garbage
For the full documentation of this feature and for new projects see @ref:[CRDT Garbage](typed/distributed-data.md#crdt-garbage).
## Samples ## Samples
Several interesting samples are included and described in the Several interesting samples are included and described in the

View file

@ -11,7 +11,7 @@ For the Akka Classic documentation of this feature see @ref:[Classic Distributed
## Dependency ## Dependency
To use Akka Cluster Distributed Data Typed, you must add the following dependency in your project: To use Akka Cluster Distributed Data, you must add the following dependency in your project:
@@dependency[sbt,Maven,Gradle] { @@dependency[sbt,Maven,Gradle] {
group=com.typesafe.akka group=com.typesafe.akka
@ -30,7 +30,7 @@ All data entries are spread to all nodes, or nodes with a certain role, in the c
via direct replication and gossip based dissemination. You have fine grained control via direct replication and gossip based dissemination. You have fine grained control
of the consistency level for reads and writes. of the consistency level for reads and writes.
The nature CRDTs makes it possible to perform updates from any node without coordination. The nature of CRDTs makes it possible to perform updates from any node without coordination.
Concurrent updates from different nodes will automatically be resolved by the monotonic Concurrent updates from different nodes will automatically be resolved by the monotonic
merge function, which all data types must provide. The state changes always converge. merge function, which all data types must provide. The state changes always converge.
Several useful data types for counters, sets, maps and registers are provided and Several useful data types for counters, sets, maps and registers are provided and
@ -46,15 +46,30 @@ The @apidoc[typed.*.Replicator]
actor provides the API for interacting with the data and is accessed through the extension actor provides the API for interacting with the data and is accessed through the extension
@apidoc[typed.*.DistributedData]. @apidoc[typed.*.DistributedData].
The messages for the replicator, such as `Replicator.Update` are defined in @apidoc[typed.*.Replicator] The messages for the replicator, such as `Replicator.Update` are defined in
but the actual CRDTs are the @scala[`akka.cluster.ddata.typed.scaladsl.Replicator`]@java[`akka.cluster.ddata.typed.javaadsl.Replicator`]
same as in classic, for example `akka.cluster.ddata.GCounter`. This will require a @scala[implicit] `akka.cluster.ddata.SelfUniqueAddress.SelfUniqueAddress`, and the actual CRDTs are defined in the `akka.cluster.ddata` package, for example
available from @scala[`implicit val node = DistributedData(system).selfUniqueAddress`]@java[SelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress();]. @apidoc[akka.cluster.ddata.GCounter]. It requires a @scala[implicit] `akka.cluster.ddata.SelfUniqueAddress`,
available from:
Scala
: @@snip [ReplicatorSpec.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/ddata/typed/scaladsl/ReplicatorDocSpec.scala) { #selfUniqueAddress }
Java
: @@snip [ReplicatorTest.java](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocSample.java) { #selfUniqueAddress }
The replicator can contain multiple entries each containing a replicated data type, we therefore need to create a The replicator can contain multiple entries each containing a replicated data type, we therefore need to create a
key identifying the entry and helping us know what type it has, and then use that key for every interaction with key identifying the entry and helping us know what type it has, and then use that key for every interaction with
the replicator. Each replicated data type contains a factory for defining such a key. the replicator. Each replicated data type contains a factory for defining such a key.
Cluster members with status @ref:[WeaklyUp](cluster-membership.md#weakly-up),
will participate in Distributed Data. This means that the data will be replicated to the
`WeaklyUp` nodes with the background gossip protocol. Note that it
will not participate in any actions where the consistency mode is to read/write from all
nodes or the majority of nodes. The `WeaklyUp` node is not counted
as part of the cluster. So 3 nodes + 5 `WeaklyUp` is essentially a
3 node cluster as far as consistent actions are concerned.
This sample uses the replicated data type `GCounter` to implement a counter that can be written to on any node of the This sample uses the replicated data type `GCounter` to implement a counter that can be written to on any node of the
cluster: cluster:
@ -70,21 +85,19 @@ often more convenient to use the `ReplicatorMessageAdapter` as in the above exam
<a id="replicator-update"></a> <a id="replicator-update"></a>
### Update ### Update
For an incoming `Increment` command, we send the `replicator` a `Replicator.Update` request, it contains five values: To modify and replicate a data value you send a `Replicator.Update` message to the local
`Replicator`.
In the above example, for an incoming `Increment` command, we send the `replicator` a `Replicator.Update` request,
it contains five values:
1. the @scala[`Key`]@java[`KEY`] we want to update 1. the @scala[`Key`]@java[`KEY`] we want to update
1. the data to use if as the empty state if the replicator has not seen the key before 1. the data to use as the empty state if the replicator has not seen the key before
1. the @ref:[write consistency level](#write-consistency) we want for the update 1. the @ref:[write consistency level](#write-consistency) we want for the update
1. an @scala[`ActorRef[Replicator.UpdateResponse[GCounter]]`]@java[`ActorRef<Replicator.UpdateResponse<GCounter>>`] 1. an @scala[`ActorRef[Replicator.UpdateResponse[GCounter]]`]@java[`ActorRef<Replicator.UpdateResponse<GCounter>>`]
to respond to when the update is completed to respond to when the update is completed
1. a function that takes a previous state and updates it, in our case by incrementing it with 1 1. a `modify` function that takes a previous state and updates it, in our case by incrementing it with 1
Whenever the distributed counter is updated, we cache the value so that we can answer requests about the value without
the extra interaction with the replicator using the `GetCachedValue` command.
The example also supports asking the replicator using the `GetValue` command. Note how the `replyTo` from the
incoming message can be used when the `GetSuccess` response from the replicator is received.
@@@ div { .group-scala } @@@ div { .group-scala }
There is alternative way of constructing the function for the `Update` message: There is alternative way of constructing the function for the `Update` message:
@ -94,11 +107,21 @@ Scala
@@@ @@@
The current data value for the `key` of the `Update` is passed as parameter to the `modify`
function of the `Update`. The function is supposed to return the new value of the data, which
will then be replicated according to the given @ref:[write consistency level](#write-consistency).
The `modify` function is called by the `Replicator` actor and must therefore be a pure
function that only uses the data parameter and stable fields from enclosing scope. It must
for example not access the `ActorContext` or mutable state of an enclosing actor.
`Update` is intended to only be sent from an actor running in same local `ActorSystem`
as the `Replicator`, because the `modify` function is typically not serializable.
You will always see your own writes. For example if you send two `Update` messages You will always see your own writes. For example if you send two `Update` messages
changing the value of the same `key`, the `modify` function of the second message will changing the value of the same `key`, the `modify` function of the second message will
see the change that was performed by the first `Update` message. see the change that was performed by the first `Update` message.
As reply of the `Update` a `Replicator.UpdateSuccess` is sent to the sender of the As reply of the `Update` a `Replicator.UpdateSuccess` is sent to the `replyTo` of the
`Update` if the value was successfully replicated according to the supplied consistency `Update` if the value was successfully replicated according to the supplied consistency
level within the supplied timeout. Otherwise a `Replicator.UpdateFailure` subclass is level within the supplied timeout. Otherwise a `Replicator.UpdateFailure` subclass is
sent back. Note that a `Replicator.UpdateTimeout` reply does not mean that the update completely failed sent back. Note that a `Replicator.UpdateTimeout` reply does not mean that the update completely failed
@ -108,7 +131,10 @@ be replicated to all nodes with the gossip protocol.
### Get ### Get
To retrieve the current value of a data you send `Replicator.Get` message to the To retrieve the current value of a data you send `Replicator.Get` message to the
`Replicator`. You supply a consistency level which has the following meaning: `Replicator`.
The example has the `GetValue` command, which is asking the replicator for current value. Note how the `replyTo` from the
incoming message can be used when the `GetSuccess` response from the replicator is received.
@@@ div { .group-scala } @@@ div { .group-scala }
Alternative way of constructing the function for the `Get` and `Delete`: Alternative way of constructing the function for the `Get` and `Delete`:
@ -119,22 +145,26 @@ Scala
@@@ @@@
For a `Get` you supply a @ref:[read consistency level](#read-consistency). For a `Get` you supply a @ref:[read consistency level](#read-consistency).
You will always read your own writes. For example if you send a `Update` message You will always read your own writes. For example if you send a `Update` message
followed by a `Get` of the same `key` the `Get` will retrieve the change that was followed by a `Get` of the same `key` the `Get` will retrieve the change that was
performed by the preceding `Update` message. However, the order of the reply messages are performed by the preceding `Update` message. However, the order of the reply messages are
not defined, i.e. in the previous example you may receive the `GetSuccess` before not defined, i.e. in the previous example you may receive the `GetSuccess` before
the `UpdateSuccess`. the `UpdateSuccess`.
As reply of the `Get` a `Replicator.GetSuccess` is sent to the sender of the As reply of the `Get` a `Replicator.GetSuccess` is sent to the `replyTo` of the
`Get` if the value was successfully retrieved according to the supplied consistency `Get` if the value was successfully retrieved according to the supplied consistency
level within the supplied timeout. Otherwise a `Replicator.GetFailure` is sent. level within the supplied timeout. Otherwise a `Replicator.GetFailure` is sent.
If the key does not exist the reply will be `Replicator.NotFound`. If the key does not exist the reply will be `Replicator.NotFound`.
### Subscribe ### Subscribe
Whenever the distributed counter in the example is updated, we cache the value so that we can answer
requests about the value without the extra interaction with the replicator using the `GetCachedValue` command.
When we start up the actor we subscribe it to changes for our key, meaning whenever the replicator observes a change When we start up the actor we subscribe it to changes for our key, meaning whenever the replicator observes a change
for the counter our actor will receive a @scala[`Replicator.Changed[GCounter]`]@java[`Replicator.Changed<GCounter>`]. Since for the counter our actor will receive a @scala[`Replicator.Changed[GCounter]`]@java[`Replicator.Changed<GCounter>`]. Since
this is not a message in our protocol, we use a message transformation function to wrap it in the internal `InternalChanged` this is not a message in our protocol, we use a message transformation function to wrap it in the internal `InternalSubscribeResponse`
message, which is then handled in the regular message handling of the behavior, as shown in the above example. message, which is then handled in the regular message handling of the behavior, as shown in the above example.
Subscribers will be notified of changes, if there are any, based on the Subscribers will be notified of changes, if there are any, based on the
configurable `akka.cluster.distributed-data.notify-subscribers-interval`. configurable `akka.cluster.distributed-data.notify-subscribers-interval`.
@ -146,7 +176,7 @@ also be de-registered with the `replicatorAdapter.unsubscribe(key)` function.
A data entry can be deleted by sending a `Replicator.Delete` message to the local A data entry can be deleted by sending a `Replicator.Delete` message to the local
`Replicator`. As reply of the `Delete` a `Replicator.DeleteSuccess` is sent to `Replicator`. As reply of the `Delete` a `Replicator.DeleteSuccess` is sent to
the sender of the `Delete` if the value was successfully deleted according to the supplied the `replyTo` of the `Delete` if the value was successfully deleted according to the supplied
consistency level within the supplied timeout. Otherwise a `Replicator.ReplicationDeleteFailure` consistency level within the supplied timeout. Otherwise a `Replicator.ReplicationDeleteFailure`
is sent. Note that `ReplicationDeleteFailure` does not mean that the delete completely failed or is sent. Note that `ReplicationDeleteFailure` does not mean that the delete completely failed or
was rolled back. It may still have been replicated to some nodes, and may eventually be replicated was rolled back. It may still have been replicated to some nodes, and may eventually be replicated
@ -156,10 +186,6 @@ A deleted key cannot be reused again, but it is still recommended to delete unus
data entries because that reduces the replication overhead when new nodes join the cluster. data entries because that reduces the replication overhead when new nodes join the cluster.
Subsequent `Delete`, `Update` and `Get` requests will be replied with `Replicator.DataDeleted`. Subsequent `Delete`, `Update` and `Get` requests will be replied with `Replicator.DataDeleted`.
Subscribers will receive `Replicator.Deleted`. Subscribers will receive `Replicator.Deleted`.
In the *Delete* message you can pass an optional request context in the same way as for the
*Update* message, described above. For example the original sender can be passed and replied
to after receiving and transforming *DeleteSuccess*.
@@@ warning @@@ warning
@ -171,32 +197,6 @@ types that support both updates and removals, for example `ORMap` or `ORSet`.
@@@ @@@
### Delta-CRDT
[Delta State Replicated Data Types](http://arxiv.org/abs/1603.01529)
are supported. A delta-CRDT is a way to reduce the need for sending the full state
for updates. For example adding element `'c'` and `'d'` to set `{'a', 'b'}` would
result in sending the delta `{'c', 'd'}` and merge that with the state on the
receiving side, resulting in set `{'a', 'b', 'c', 'd'}`.
The protocol for replicating the deltas supports causal consistency if the data type
is marked with `RequiresCausalDeliveryOfDeltas`. Otherwise it is only eventually
consistent. Without causal consistency it means that if elements `'c'` and `'d'` are
added in two separate *Update* operations these deltas may occasionally be propagated
to nodes in a different order to the causal order of the updates. For this example it
can result in that set `{'a', 'b', 'd'}` can be seen before element 'c' is seen. Eventually
it will be `{'a', 'b', 'c', 'd'}`.
Note that the full state is occasionally also replicated for delta-CRDTs, for example when
new nodes are added to the cluster or when deltas could not be propagated because
of network partitions or similar problems.
The the delta propagation can be disabled with configuration property:
```
akka.cluster.distributed-data.delta-crdt.enabled=off
```
### Consistency ### Consistency
The consistency level that is supplied in the @ref:[Update](#update) and @ref:[Get](#get) The consistency level that is supplied in the @ref:[Update](#update) and @ref:[Get](#get)
@ -308,6 +308,17 @@ happens to be n4, n5, n6, n7, i.e. the value on n1, n2, n3 is not seen in the re
@@@ @@@
### Running separate instances of the replicator
For some use cases, for example when limiting the replicator to certain roles, or using different subsets on different roles,
it makes sense to start separate replicators, this needs to be done on all nodes, or
the group of nodes tagged with a specific role. To do this with Distributed Data you will first
have to start a classic `Replicator` and pass it to the `Replicator.behavior` method that takes a classic
actor ref. All such `Replicator`s must run on the same path in the classic actor hierarchy.
A standalone `ReplicatorMessageAdapter` can also be created for a given `Replicator` instead of creating
one via the `DistributedData` extension.
## Replicated data types ## Replicated data types
Akka contains a set of useful replicated data types and it is fully possible to implement custom replicated data types. Akka contains a set of useful replicated data types and it is fully possible to implement custom replicated data types.
@ -505,6 +516,32 @@ synchronized clocks when there is only one active writer, e.g. a Cluster Singlet
single writer should then first read current value with `ReadMajority` (or more) before single writer should then first read current value with `ReadMajority` (or more) before
changing and writing the value with `WriteMajority` (or more). changing and writing the value with `WriteMajority` (or more).
### Delta-CRDT
[Delta State Replicated Data Types](http://arxiv.org/abs/1603.01529)
are supported. A delta-CRDT is a way to reduce the need for sending the full state
for updates. For example adding element `'c'` and `'d'` to set `{'a', 'b'}` would
result in sending the delta `{'c', 'd'}` and merge that with the state on the
receiving side, resulting in set `{'a', 'b', 'c', 'd'}`.
The protocol for replicating the deltas supports causal consistency if the data type
is marked with `RequiresCausalDeliveryOfDeltas`. Otherwise it is only eventually
consistent. Without causal consistency it means that if elements `'c'` and `'d'` are
added in two separate *Update* operations these deltas may occasionally be propagated
to nodes in a different order to the causal order of the updates. For this example it
can result in that set `{'a', 'b', 'd'}` can be seen before element 'c' is seen. Eventually
it will be `{'a', 'b', 'c', 'd'}`.
Note that the full state is occasionally also replicated for delta-CRDTs, for example when
new nodes are added to the cluster or when deltas could not be propagated because
of network partitions or similar problems.
The the delta propagation can be disabled with configuration property:
```
akka.cluster.distributed-data.delta-crdt.enabled=off
```
### Custom Data Type ### Custom Data Type
You can implement your own data types. The only requirement is that it implements You can implement your own data types. The only requirement is that it implements
@ -544,6 +581,16 @@ This is a protobuf representation of the above `TwoPhaseSet`:
@@snip [TwoPhaseSetMessages.proto](/akka-docs/src/test/../main/protobuf/TwoPhaseSetMessages.proto) { #twophaseset } @@snip [TwoPhaseSetMessages.proto](/akka-docs/src/test/../main/protobuf/TwoPhaseSetMessages.proto) { #twophaseset }
The serializer for the `TwoPhaseSet`:
Scala
: @@snip [TwoPhaseSetSerializer.scala](/akka-docs/src/test/scala/docs/ddata/protobuf/TwoPhaseSetSerializer.scala) { #serializer }
Java
: @@snip [TwoPhaseSetSerializer.java](/akka-docs/src/test/java/jdocs/ddata/protobuf/TwoPhaseSetSerializer.java) { #serializer }
Note that the elements of the sets are sorted so the SHA-1 digests are the same for the same elements.
You register the serializer in configuration: You register the serializer in configuration:
Scala Scala
@ -552,21 +599,34 @@ Scala
Java Java
: @@snip [DistributedDataDocSpec.scala](/akka-docs/src/test/scala/docs/ddata/DistributedDataDocSpec.scala) { #japi-serializer-config } : @@snip [DistributedDataDocSpec.scala](/akka-docs/src/test/scala/docs/ddata/DistributedDataDocSpec.scala) { #japi-serializer-config }
For a serialization example and how to use with compression, Using compression can sometimes be a good idea to reduce the data size. Gzip compression is
see @ref[](../distributed-data.md#serialization) in the documentation of the Akka Classic APIs. provided by the @scala[`akka.cluster.ddata.protobuf.SerializationSupport` trait]@java[`akka.cluster.ddata.protobuf.AbstractSerializationSupport` interface]:
### Running separate instances of the replicator Scala
: @@snip [TwoPhaseSetSerializer.scala](/akka-docs/src/test/scala/docs/ddata/protobuf/TwoPhaseSetSerializer.scala) { #compression }
For some use cases, for example when limiting the replicator to certain roles, or using different subsets on different roles, Java
it makes sense to start separate replicators, this needs to be done on all nodes, or : @@snip [TwoPhaseSetSerializerWithCompression.java](/akka-docs/src/test/java/jdocs/ddata/protobuf/TwoPhaseSetSerializerWithCompression.java) { #compression }
the group of nodes tagged with a specific role. To do this with Distributed Data you will first
have to start a classic `Replicator` and pass it to the `Replicator.behavior` method that takes a classic
actor ref. All such `Replicator`s must run on the same path in the classic actor hierarchy.
A standalone `ReplicatorMessageAdapter` can also be created for a given `Replicator` instead of creating The two embedded `GSet` can be serialized as illustrated above, but in general when composing
one via the `DistributedData` extension. new data types from the existing built in types it is better to make use of the existing
serializer for those types. This can be done by declaring those as bytes fields in protobuf:
### Durable Storage @@snip [TwoPhaseSetMessages.proto](/akka-docs/src/test/../main/protobuf/TwoPhaseSetMessages.proto) { #twophaseset2 }
and use the methods `otherMessageToProto` and `otherMessageFromBinary` that are provided
by the `SerializationSupport` trait to serialize and deserialize the `GSet` instances. This
works with any type that has a registered Akka serializer. This is how such an serializer would
look like for the `TwoPhaseSet`:
Scala
: @@snip [TwoPhaseSetSerializer2.scala](/akka-docs/src/test/scala/docs/ddata/protobuf/TwoPhaseSetSerializer2.scala) { #serializer }
Java
: @@snip [TwoPhaseSetSerializer2.java](/akka-docs/src/test/java/jdocs/ddata/protobuf/TwoPhaseSetSerializer2.java) { #serializer }
## Durable Storage
By default the data is only kept in memory. It is redundant since it is replicated to other nodes By default the data is only kept in memory. It is redundant since it is replicated to other nodes
in the cluster, but if you stop all nodes the data is lost, unless you have saved it in the cluster, but if you stop all nodes the data is lost, unless you have saved it
@ -651,16 +711,6 @@ This would be possible if a node with durable data didn't participate in the pru
be stopped for longer time than this duration and if it is joining again after this be stopped for longer time than this duration and if it is joining again after this
duration its data should first be manually removed (from the lmdb directory). duration its data should first be manually removed (from the lmdb directory).
### CRDT Garbage
One thing that can be problematic with CRDTs is that some data types accumulate history (garbage).
For example a `GCounter` keeps track of one counter per node. If a `GCounter` has been updated
from one node it will associate the identifier of that node forever. That can become a problem
for long running systems with many cluster nodes being added and removed. To solve this problem
the `Replicator` performs pruning of data associated with nodes that have been removed from the
cluster. Data types that need pruning have to implement the `RemovedNodePruning` trait. See the
API documentation of the `Replicator` for details.
## Limitations ## Limitations
There are some limitations that you should be aware of. There are some limitations that you should be aware of.
@ -683,6 +733,16 @@ for example when new nodes are added to the cluster or when deltas could not be
of network partitions or similar problems. This means that you cannot have too large of network partitions or similar problems. This means that you cannot have too large
data entries, because then the remote message size will be too large. data entries, because then the remote message size will be too large.
### CRDT Garbage
One thing that can be problematic with CRDTs is that some data types accumulate history (garbage).
For example a `GCounter` keeps track of one counter per node. If a `GCounter` has been updated
from one node it will associate the identifier of that node forever. That can become a problem
for long running systems with many cluster nodes being added and removed. To solve this problem
the `Replicator` performs pruning of data associated with nodes that have been removed from the
cluster. Data types that need pruning have to implement the `RemovedNodePruning` trait. See the
API documentation of the `Replicator` for details.
## Learn More about CRDTs ## Learn More about CRDTs
* [Eventually Consistent Data Structures](https://vimeo.com/43903960) * [Eventually Consistent Data Structures](https://vimeo.com/43903960)

View file

@ -387,7 +387,7 @@ public class DistributedDataDocTest extends AbstractJavaTest {
public void demonstrateORSet() { public void demonstrateORSet() {
// #orset // #orset
final Cluster node = Cluster.get(system); final SelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress();
final ORSet<String> s0 = ORSet.create(); final ORSet<String> s0 = ORSet.create();
final ORSet<String> s1 = s0.add(node, "a"); final ORSet<String> s1 = s0.add(node, "a");
final ORSet<String> s2 = s1.add(node, "b"); final ORSet<String> s2 = s1.add(node, "b");