=doc consolidate content of distributed-data.md (#23052)

This commit is contained in:
gosubpl 2017-07-21 16:30:48 +02:00
parent aec87a94c4
commit a1ba7aab6f
2 changed files with 354 additions and 92 deletions

View file

@ -41,7 +41,11 @@ Below is an example of an actor that schedules tick messages to itself and for e
adds or removes elements from a `ORSet` (observed-remove set). It also subscribes to adds or removes elements from a `ORSet` (observed-remove set). It also subscribes to
changes of this. changes of this.
@@snip [DataBot.java]($code$/java/jdocs/ddata/DataBot.java) { #data-bot } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #data-bot }
Java
: @@snip [DataBot.java]($code$/java/jdocs/ddata/DataBot.java) { #data-bot }
<a id="replicator-update"></a> <a id="replicator-update"></a>
### Update ### Update
@ -55,7 +59,7 @@ will then be replicated according to the given consistency level.
The `modify` function is called by the `Replicator` actor and must therefore be a pure The `modify` function is called by the `Replicator` actor and must therefore be a pure
function that only uses the data parameter and stable fields from enclosing scope. It must function that only uses the data parameter and stable fields from enclosing scope. It must
for example not access the sender reference of an enclosing actor. for example not access the sender (@scala[`sender()`]@java[`getSender()`]) reference of an enclosing actor.
`Update` `Update`
is intended to only be sent from an actor running in same local is intended to only be sent from an actor running in same local
@ -66,7 +70,7 @@ for example not access the sender reference of an enclosing actor.
You supply a write consistency level which has the following meaning: You supply a write consistency level which has the following meaning:
* `writeLocal` the value will immediately only be written to the local replica, * @scala[`WriteLocal`]@java[`writeLocal`] the value will immediately only be written to the local replica,
and later disseminated with gossip and later disseminated with gossip
* `WriteTo(n)` the value will immediately be written to at least `n` replicas, * `WriteTo(n)` the value will immediately be written to at least `n` replicas,
including the local replica including the local replica
@ -83,7 +87,11 @@ are prefered over unreachable nodes.
Note that `WriteMajority` has a `minCap` parameter that is useful to specify to achieve better safety for small clusters. Note that `WriteMajority` has a `minCap` parameter that is useful to specify to achieve better safety for small clusters.
@@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #update } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #update }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #update }
As reply of the `Update` a `Replicator.UpdateSuccess` is sent to the sender of the As reply of the `Update` a `Replicator.UpdateSuccess` is sent to the sender of the
`Update` if the value was successfully replicated according to the supplied consistency `Update` if the value was successfully replicated according to the supplied consistency
@ -92,9 +100,18 @@ sent back. Note that a `Replicator.UpdateTimeout` reply does not mean that the u
or was rolled back. It may still have been replicated to some nodes, and will eventually or was rolled back. It may still have been replicated to some nodes, and will eventually
be replicated to all nodes with the gossip protocol. be replicated to all nodes with the gossip protocol.
@@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #update-response1 } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #update-response1 }
@@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #update-response2 } Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #update-response1 }
Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #update-response2 }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #update-response2 }
You will always see your own writes. For example if you send two `Update` messages You will always see your own writes. For example if you send two `Update` messages
changing the value of the same `key`, the `modify` function of the second message will changing the value of the same `key`, the `modify` function of the second message will
@ -105,7 +122,11 @@ does not care about, but is included in the reply messages. This is a convenient
way to pass contextual information (e.g. original sender) without having to use `ask` way to pass contextual information (e.g. original sender) without having to use `ask`
or maintain local correlation data structures. or maintain local correlation data structures.
@@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #update-request-context } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #update-request-context }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #update-request-context }
<a id="replicator-get"></a> <a id="replicator-get"></a>
### Get ### Get
@ -113,7 +134,7 @@ or maintain local correlation data structures.
To retrieve the current value of a data you send `Replicator.Get` message to the To retrieve the current value of a data you send `Replicator.Get` message to the
`Replicator`. You supply a consistency level which has the following meaning: `Replicator`. You supply a consistency level which has the following meaning:
* `readLocal` the value will only be read from the local replica * @scala[`ReadLocal`]@java[`readLocal`] the value will only be read from the local replica
* `ReadFrom(n)` the value will be read and merged from `n` replicas, * `ReadFrom(n)` the value will be read and merged from `n` replicas,
including the local replica including the local replica
* `ReadMajority` the value will be read and merged from a majority of replicas, i.e. * `ReadMajority` the value will be read and merged from a majority of replicas, i.e.
@ -124,16 +145,29 @@ at least **N/2 + 1** replicas, where N is the number of nodes in the cluster
Note that `ReadMajority` has a `minCap` parameter that is useful to specify to achieve better safety for small clusters. Note that `ReadMajority` has a `minCap` parameter that is useful to specify to achieve better safety for small clusters.
@@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #get } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #get }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #get }
As reply of the `Get` a `Replicator.GetSuccess` is sent to the sender of the As reply of the `Get` a `Replicator.GetSuccess` is sent to the sender of the
`Get` if the value was successfully retrieved according to the supplied consistency `Get` if the value was successfully retrieved according to the supplied consistency
level within the supplied timeout. Otherwise a `Replicator.GetFailure` is sent. level within the supplied timeout. Otherwise a `Replicator.GetFailure` is sent.
If the key does not exist the reply will be `Replicator.NotFound`. If the key does not exist the reply will be `Replicator.NotFound`.
@@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #get-response1 } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #get-response1 }
@@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #get-response2 } Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #get-response1 }
Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #get-response2 }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #get-response2 }
You will always read your own writes. For example if you send a `Update` message You will always read your own writes. For example if you send a `Update` message
followed by a `Get` of the same `key` the `Get` will retrieve the change that was followed by a `Get` of the same `key` the `Get` will retrieve the change that was
@ -145,17 +179,21 @@ In the `Get` message you can pass an optional request context in the same way as
`Update` message, described above. For example the original sender can be passed and replied `Update` message, described above. For example the original sender can be passed and replied
to after receiving and transforming `GetSuccess`. to after receiving and transforming `GetSuccess`.
@@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #get-request-context } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #get-request-context }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #get-request-context }
### Consistency ### Consistency
The consistency level that is supplied in the [Update](#replicator-update) and [Get](#replicator-get) The consistency level that is supplied in the [Update](#replicator-update) and [Get](#replicator-get)
specifies per request how many replicas that must respond successfully to a write and read request. specifies per request how many replicas that must respond successfully to a write and read request.
For low latency reads you use `ReadLocal` with the risk of retrieving stale data, i.e. updates For low latency reads you use @scala[`ReadLocal`]@java[`readLocal`] with the risk of retrieving stale data, i.e. updates
from other nodes might not be visible yet. from other nodes might not be visible yet.
When using `writeLocal` the update is only written to the local replica and then disseminated When using @scala[`WriteLocal`]@java[`writeLocal`] the update is only written to the local replica and then disseminated
in the background with the gossip protocol, which can take few seconds to spread to all nodes. in the background with the gossip protocol, which can take few seconds to spread to all nodes.
`WriteAll` and `ReadAll` is the strongest consistency level, but also the slowest and with `WriteAll` and `ReadAll` is the strongest consistency level, but also the slowest and with
@ -195,13 +233,27 @@ is useful to specify to achieve better safety for small clusters. It means that
size is smaller than the majority size it will use the `minCap` number of nodes but at most size is smaller than the majority size it will use the `minCap` number of nodes but at most
the total size of the cluster. the total size of the cluster.
Here is an example of using `writeMajority` and `readMajority`: Here is an example of using `WriteMajority` and `ReadMajority`:
@@snip [ShoppingCart.java]($code$/java/jdocs/ddata/ShoppingCart.java) { #read-write-majority } Scala
: @@snip [ShoppingCart.scala]($code$/scala/docs/ddata/ShoppingCart.scala) { #read-write-majority }
@@snip [ShoppingCart.java]($code$/java/jdocs/ddata/ShoppingCart.java) { #get-cart } Java
: @@snip [ShoppingCart.java]($code$/java/jdocs/ddata/ShoppingCart.java) { #read-write-majority }
@@snip [ShoppingCart.java]($code$/java/jdocs/ddata/ShoppingCart.java) { #add-item }
Scala
: @@snip [ShoppingCart.scala]($code$/scala/docs/ddata/ShoppingCart.scala) { #get-cart }
Java
: @@snip [ShoppingCart.java]($code$/java/jdocs/ddata/ShoppingCart.java) { #get-cart }
Scala
: @@snip [ShoppingCart.scala]($code$/scala/docs/ddata/ShoppingCart.scala) { #add-item }
Java
: @@snip [ShoppingCart.java]($code$/java/jdocs/ddata/ShoppingCart.java) { #add-item }
In some rare cases, when performing an `Update` it is needed to first try to fetch latest data from In some rare cases, when performing an `Update` it is needed to first try to fetch latest data from
other nodes. That can be done by first sending a `Get` with `ReadMajority` and then continue with other nodes. That can be done by first sending a `Get` with `ReadMajority` and then continue with
@ -213,11 +265,15 @@ performed (hence the name observed-removed set).
The following example illustrates how to do that: The following example illustrates how to do that:
@@snip [ShoppingCart.java]($code$/java/jdocs/ddata/ShoppingCart.java) { #remove-item } Scala
: @@snip [ShoppingCart.scala]($code$/scala/docs/ddata/ShoppingCart.scala) { #remove-item }
Java
: @@snip [ShoppingCart.java]($code$/java/jdocs/ddata/ShoppingCart.java) { #remove-item }
@@@ warning @@@ warning
*Caveat:* Even if you use `writeMajority` and `readMajority` there is small risk that you may *Caveat:* Even if you use `WriteMajority` and `ReadMajority` there is small risk that you may
read stale data if the cluster membership has changed between the `Update` and the `Get`. read stale data if the cluster membership has changed between the `Update` and the `Get`.
For example, in cluster of 5 nodes when you `Update` and that change is written to 3 nodes: For example, in cluster of 5 nodes when you `Update` and that change is written to 3 nodes:
n1, n2, n3. Then 2 more nodes are added and a `Get` request is reading from 4 nodes, which n1, n2, n3. Then 2 more nodes are added and a `Get` request is reading from 4 nodes, which
@ -238,7 +294,11 @@ immediately.
The subscriber is automatically removed if the subscriber is terminated. A subscriber can The subscriber is automatically removed if the subscriber is terminated. A subscriber can
also be deregistered with the `Replicator.Unsubscribe` message. also be deregistered with the `Replicator.Unsubscribe` message.
@@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #subscribe } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #subscribe }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #subscribe }
### Delete ### Delete
@ -253,9 +313,17 @@ to all nodes.
A deleted key cannot be reused again, but it is still recommended to delete unused A deleted key cannot be reused again, but it is still recommended to delete unused
data entries because that reduces the replication overhead when new nodes join the cluster. data entries because that reduces the replication overhead when new nodes join the cluster.
Subsequent `Delete`, `Update` and `Get` requests will be replied with `Replicator.DataDeleted`. Subsequent `Delete`, `Update` and `Get` requests will be replied with `Replicator.DataDeleted`.
Subscribers will receive `Replicator.DataDeleted`. Subscribers will receive `Replicator.Deleted`.
@@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #delete } In the *Delete* message you can pass an optional request context in the same way as for the
*Update* message, described above. For example the original sender can be passed and replied
to after receiving and transforming *DeleteSuccess*.
Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #delete }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #delete }
@@@ warning @@@ warning
@ -296,11 +364,11 @@ akka.cluster.distributed-data.delta-crdt.enabled=off
## Data Types ## Data Types
The data types must be convergent (stateful) CRDTs and implement the `ReplicatedData` trait, The data types must be convergent (stateful) CRDTs and implement the @scala[`ReplicatedData` trait]@java[`AbstractReplicatedData` interface],
i.e. they provide a monotonic merge function and the state changes always converge. i.e. they provide a monotonic merge function and the state changes always converge.
You can use your own custom `AbstractReplicatedData` or `AbstractDeltaReplicatedData` types, You can use your own custom @scala[`ReplicatedData` or `DeltaReplicatedData`]@java[`AbstractReplicatedData` or `AbstractDeltaReplicatedData`] types, and several types are provided
and several types are provided by this package, such as: by this package, such as:
* Counters: `GCounter`, `PNCounter` * Counters: `GCounter`, `PNCounter`
* Sets: `GSet`, `ORSet` * Sets: `GSet`, `ORSet`
@ -321,7 +389,11 @@ It is tracking the increments (P) separate from the decrements (N). Both P and N
as two internal `GCounter`. Merge is handled by merging the internal P and N counters. as two internal `GCounter`. Merge is handled by merging the internal P and N counters.
The value of the counter is the value of the P counter minus the value of the N counter. The value of the counter is the value of the P counter minus the value of the N counter.
@@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #pncounter } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #pncounter }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #pncounter }
`GCounter` and `PNCounter` have support for [delta-CRDT](#delta-crdt) and don't need causal `GCounter` and `PNCounter` have support for [delta-CRDT](#delta-crdt) and don't need causal
delivery of deltas. delivery of deltas.
@ -331,7 +403,11 @@ When the counters are placed in a `PNCounterMap` as opposed to placing them as s
values they are guaranteed to be replicated together as one unit, which is sometimes necessary for values they are guaranteed to be replicated together as one unit, which is sometimes necessary for
related data. related data.
@@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #pncountermap } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #pncountermap }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #pncountermap }
### Sets ### Sets
@ -339,7 +415,11 @@ If you only need to add elements to a set and not remove elements the `GSet` (gr
the data type to use. The elements can be any type of values that can be serialized. the data type to use. The elements can be any type of values that can be serialized.
Merge is simply the union of the two sets. Merge is simply the union of the two sets.
@@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #gset } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #gset }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #gset }
`GSet` has support for [delta-CRDT](#delta-crdt) and it doesn't require causal delivery of deltas. `GSet` has support for [delta-CRDT](#delta-crdt) and it doesn't require causal delivery of deltas.
@ -352,7 +432,11 @@ The version for the node that added the element is also tracked for each element
called "birth dot". The version vector and the dots are used by the `merge` function to called "birth dot". The version vector and the dots are used by the `merge` function to
track causality of the operations and resolve concurrent updates. track causality of the operations and resolve concurrent updates.
@@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #orset } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #orset }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #orset }
`ORSet` has support for [delta-CRDT](#delta-crdt) and it requires causal delivery of deltas. `ORSet` has support for [delta-CRDT](#delta-crdt) and it requires causal delivery of deltas.
@ -374,8 +458,8 @@ such as the following specialized maps.
`ORMultiMap` (observed-remove multi-map) is a multi-map implementation that wraps an `ORMultiMap` (observed-remove multi-map) is a multi-map implementation that wraps an
`ORMap` with an `ORSet` for the map's value. `ORMap` with an `ORSet` for the map's value.
`PNCounterMap` (positive negative counter map) is a map of named counters. It is a specialized `PNCounterMap` (positive negative counter map) is a map of named counters (where the name can be of any type).
`ORMap` with `PNCounter` values. It is a specialized `ORMap` with `PNCounter` values.
`LWWMap` (last writer wins map) is a specialized `ORMap` with `LWWRegister` (last writer wins register) `LWWMap` (last writer wins map) is a specialized `ORMap` with `LWWRegister` (last writer wins register)
values. values.
@ -396,7 +480,11 @@ There is ongoing work aimed at removing necessity of creation of the aforementio
that despite having the same Scala type, `ORMultiMap.emptyWithValueDeltas` is not compatible with 'vanilla' `ORMultiMap`, that despite having the same Scala type, `ORMultiMap.emptyWithValueDeltas` is not compatible with 'vanilla' `ORMultiMap`,
because of different replication mechanism. because of different replication mechanism.
@@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #ormultimap } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #ormultimap }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #ormultimap }
When a data entry is changed the full state of that entry is replicated to other nodes, i.e. When a data entry is changed the full state of that entry is replicated to other nodes, i.e.
when you update a map the whole map is replicated. Therefore, instead of using one `ORMap` when you update a map the whole map is replicated. Therefore, instead of using one `ORMap`
@ -415,7 +503,11 @@ in the below section about `LWWRegister`.
`Flag` is a data type for a boolean value that is initialized to `false` and can be switched `Flag` is a data type for a boolean value that is initialized to `false` and can be switched
to `true`. Thereafter it cannot be changed. `true` wins over `false` in merge. to `true`. Thereafter it cannot be changed. `true` wins over `false` in merge.
@@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #flag } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #flag }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #flag }
`LWWRegister` (last writer wins register) can hold any (serializable) value. `LWWRegister` (last writer wins register) can hold any (serializable) value.
@ -426,13 +518,21 @@ value is not important for concurrent updates occurring within the clock skew.
Merge takes the register updated by the node with lowest address (`UniqueAddress` is ordered) Merge takes the register updated by the node with lowest address (`UniqueAddress` is ordered)
if the timestamps are exactly the same. if the timestamps are exactly the same.
@@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #lwwregister } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #lwwregister }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #lwwregister }
Instead of using timestamps based on `System.currentTimeMillis()` time it is possible to Instead of using timestamps based on `System.currentTimeMillis()` time it is possible to
use a timestamp value based on something else, for example an increasing version number use a timestamp value based on something else, for example an increasing version number
from a database record that is used for optimistic concurrency control. from a database record that is used for optimistic concurrency control.
@@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #lwwregister-custom-clock } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #lwwregister-custom-clock }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #lwwregister-custom-clock }
For first-write-wins semantics you can use the `LWWRegister#reverseClock` instead of the For first-write-wins semantics you can use the `LWWRegister#reverseClock` instead of the
`LWWRegister#defaultClock`. `LWWRegister#defaultClock`.
@ -447,7 +547,7 @@ changing and writing the value with `WriteMajority` (or more).
### Custom Data Type ### Custom Data Type
You can rather easily implement your own data types. The only requirement is that it implements You can rather easily implement your own data types. The only requirement is that it implements
the `mergeData` function of the `AbstractReplicatedData` class. the @scala[`merge`]@java[`mergeData`] function of the @scala[`ReplicatedData`]@java[`AbstractReplicatedData`] trait.
A nice property of stateful CRDTs is that they typically compose nicely, i.e. you can combine several A nice property of stateful CRDTs is that they typically compose nicely, i.e. you can combine several
smaller data types to build richer data structures. For example, the `PNCounter` is composed of smaller data types to build richer data structures. For example, the `PNCounter` is composed of
@ -457,11 +557,15 @@ Here is s simple implementation of a custom `TwoPhaseSet` that is using two inte
to keep track of addition and removals. A `TwoPhaseSet` is a set where an element may be added and to keep track of addition and removals. A `TwoPhaseSet` is a set where an element may be added and
removed, but never added again thereafter. removed, but never added again thereafter.
@@snip [TwoPhaseSet.java]($code$/java/jdocs/ddata/TwoPhaseSet.java) { #twophaseset } Scala
: @@snip [TwoPhaseSet.scala]($code$/scala/docs/ddata/TwoPhaseSet.scala) { #twophaseset }
Java
: @@snip [TwoPhaseSet.java]($code$/java/jdocs/ddata/TwoPhaseSet.java) { #twophaseset }
Data types should be immutable, i.e. "modifying" methods should return a new instance. Data types should be immutable, i.e. "modifying" methods should return a new instance.
Implement the additional methods of `AbstractDeltaReplicatedData` if it has support for delta-CRDT replication. Implement the additional methods of @scala[`DeltaReplicatedData`]@java[`AbstractDeltaReplicatedData`] if it has support for delta-CRDT replication.
#### Serialization #### Serialization
@ -481,19 +585,31 @@ This is a protobuf representation of the above `TwoPhaseSet`:
The serializer for the `TwoPhaseSet`: The serializer for the `TwoPhaseSet`:
@@snip [TwoPhaseSetSerializer.java]($code$/java/jdocs/ddata/protobuf/TwoPhaseSetSerializer.java) { #serializer } Scala
: @@snip [TwoPhaseSetSerializer.scala]($code$/scala/docs/ddata/protobuf/TwoPhaseSetSerializer.scala) { #serializer }
Java
: @@snip [TwoPhaseSetSerializer.java]($code$/java/jdocs/ddata/protobuf/TwoPhaseSetSerializer.java) { #serializer }
Note that the elements of the sets are sorted so the SHA-1 digests are the same Note that the elements of the sets are sorted so the SHA-1 digests are the same
for the same elements. for the same elements.
You register the serializer in configuration: You register the serializer in configuration:
@@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #japi-serializer-config } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #serializer-config }
Java
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #japi-serializer-config }
Using compression can sometimes be a good idea to reduce the data size. Gzip compression is Using compression can sometimes be a good idea to reduce the data size. Gzip compression is
provided by the `akka.cluster.ddata.protobuf.SerializationSupport` trait: provided by the @scala[`akka.cluster.ddata.protobuf.SerializationSupport` trait]@java[`akka.cluster.ddata.protobuf.AbstractSerializationSupport` interface]:
@@snip [TwoPhaseSetSerializerWithCompression.java]($code$/java/jdocs/ddata/protobuf/TwoPhaseSetSerializerWithCompression.java) { #compression } Scala
: @@snip [TwoPhaseSetSerializer.scala]($code$/scala/docs/ddata/protobuf/TwoPhaseSetSerializer.scala) { #compression }
Java
: @@snip [TwoPhaseSetSerializerWithCompression.java]($code$/java/jdocs/ddata/protobuf/TwoPhaseSetSerializerWithCompression.java) { #compression }
The two embedded `GSet` can be serialized as illustrated above, but in general when composing The two embedded `GSet` can be serialized as illustrated above, but in general when composing
new data types from the existing built in types it is better to make use of the existing new data types from the existing built in types it is better to make use of the existing
@ -506,7 +622,11 @@ by the `SerializationSupport` trait to serialize and deserialize the `GSet` inst
works with any type that has a registered Akka serializer. This is how such an serializer would works with any type that has a registered Akka serializer. This is how such an serializer would
look like for the `TwoPhaseSet`: look like for the `TwoPhaseSet`:
@@snip [TwoPhaseSetSerializer2.java]($code$/java/jdocs/ddata/protobuf/TwoPhaseSetSerializer2.java) { #serializer } Scala
: @@snip [TwoPhaseSetSerializer2.scala]($code$/scala/docs/ddata/protobuf/TwoPhaseSetSerializer2.scala) { #serializer }
Java
: @@snip [TwoPhaseSetSerializer2.java]($code$/java/jdocs/ddata/protobuf/TwoPhaseSetSerializer2.java) { #serializer }
<a id="ddata-durable"></a> <a id="ddata-durable"></a>
### Durable Storage ### Durable Storage
@ -532,23 +652,36 @@ All entries can be made durable by specifying:
akka.cluster.distributed-data.durable.keys = ["*"] akka.cluster.distributed-data.durable.keys = ["*"]
``` ```
[LMDB](https://github.com/lmdbjava/lmdbjava/) is the default storage implementation. It is @scala[[LMDB](https://symas.com/products/lightning-memory-mapped-database/)]@java[[LMDB](https://github.com/lmdbjava/lmdbjava/)] is the default storage implementation. It is
possible to replace that with another implementation by implementing the actor protocol described in possible to replace that with another implementation by implementing the actor protocol described in
`akka.cluster.ddata.DurableStore` and defining the `akka.cluster.distributed-data.durable.store-actor-class` `akka.cluster.ddata.DurableStore` and defining the `akka.cluster.distributed-data.durable.store-actor-class`
property for the new implementation. property for the new implementation.
The location of the files for the data is configured with: The location of the files for the data is configured with:
``` Scala
: ```
# Directory of LMDB file. There are two options: # Directory of LMDB file. There are two options:
# 1. A relative or absolute path to a directory that ends with 'ddata' # 1. A relative or absolute path to a directory that ends with 'ddata'
# the full name of the directory will contain name of the ActorSystem # the full name of the directory will contain name of the ActorSystem
# and its remote port. # and its remote port.
# 2. Otherwise the path is used as is, as a relative or absolute path to # 2. Otherwise the path is used as is, as a relative or absolute path to
# a directory. # a directory.
akka.cluster.distributed-data.lmdb.dir = "ddata" akka.cluster.distributed-data.durable.lmdb.dir = "ddata"
``` ```
Java
: ```
# Directory of LMDB file. There are two options:
# 1. A relative or absolute path to a directory that ends with 'ddata'
# the full name of the directory will contain name of the ActorSystem
# and its remote port.
# 2. Otherwise the path is used as is, as a relative or absolute path to
# a directory.
akka.cluster.distributed-data.durable.lmdb.dir = "ddata"
```
When running in production you may want to configure the directory to a specific When running in production you may want to configure the directory to a specific
path (alt 2), since the default directory contains the remote port of the path (alt 2), since the default directory contains the remote port of the
actor system to make the name unique. If using a dynamically assigned actor system to make the name unique. If using a dynamically assigned
@ -595,7 +728,7 @@ API documentation of the `Replicator` for details.
## Samples ## Samples
Several interesting samples are included and described in the Several interesting samples are included and described in the
tutorial named @extref[Akka Distributed Data Samples with Java](ecs:akka-samples-distributed-data-java) (@extref[source code](samples:akka-sample-distributed-data-java)) tutorial named @scala[@extref[Akka Distributed Data Samples with Scala](ecs:akka-samples-distributed-data-scala) (@extref[source code](samples:akka-sample-distributed-data-scala))]@java[@extref[Akka Distributed Data Samples with Java](ecs:akka-samples-distributed-data-java) (@extref[source code](samples:akka-sample-distributed-data-java))]
* Low Latency Voting Service * Low Latency Voting Service
* Highly Available Shopping Cart * Highly Available Shopping Cart
@ -662,4 +795,4 @@ Maven
The `DistributedData` extension can be configured with the following properties: The `DistributedData` extension can be configured with the following properties:
@@snip [reference.conf]($akka$/akka-distributed-data/src/main/resources/reference.conf) { #distributed-data } @@snip [reference.conf]($akka$/akka-distributed-data/src/main/resources/reference.conf) { #distributed-data }

View file

@ -41,7 +41,11 @@ Below is an example of an actor that schedules tick messages to itself and for e
adds or removes elements from a `ORSet` (observed-remove set). It also subscribes to adds or removes elements from a `ORSet` (observed-remove set). It also subscribes to
changes of this. changes of this.
@@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #data-bot } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #data-bot }
Java
: @@snip [DataBot.java]($code$/java/jdocs/ddata/DataBot.java) { #data-bot }
<a id="replicator-update"></a> <a id="replicator-update"></a>
### Update ### Update
@ -55,7 +59,7 @@ will then be replicated according to the given consistency level.
The `modify` function is called by the `Replicator` actor and must therefore be a pure The `modify` function is called by the `Replicator` actor and must therefore be a pure
function that only uses the data parameter and stable fields from enclosing scope. It must function that only uses the data parameter and stable fields from enclosing scope. It must
for example not access `sender()` reference of an enclosing actor. for example not access the sender (@scala[`sender()`]@java[`getSender()`]) reference of an enclosing actor.
`Update` `Update`
is intended to only be sent from an actor running in same local is intended to only be sent from an actor running in same local
@ -66,7 +70,7 @@ for example not access `sender()` reference of an enclosing actor.
You supply a write consistency level which has the following meaning: You supply a write consistency level which has the following meaning:
* `WriteLocal` the value will immediately only be written to the local replica, * @scala[`WriteLocal`]@java[`writeLocal`] the value will immediately only be written to the local replica,
and later disseminated with gossip and later disseminated with gossip
* `WriteTo(n)` the value will immediately be written to at least `n` replicas, * `WriteTo(n)` the value will immediately be written to at least `n` replicas,
including the local replica including the local replica
@ -83,7 +87,11 @@ are prefered over unreachable nodes.
Note that `WriteMajority` has a `minCap` parameter that is useful to specify to achieve better safety for small clusters. Note that `WriteMajority` has a `minCap` parameter that is useful to specify to achieve better safety for small clusters.
@@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #update } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #update }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #update }
As reply of the `Update` a `Replicator.UpdateSuccess` is sent to the sender of the As reply of the `Update` a `Replicator.UpdateSuccess` is sent to the sender of the
`Update` if the value was successfully replicated according to the supplied consistency `Update` if the value was successfully replicated according to the supplied consistency
@ -92,9 +100,18 @@ sent back. Note that a `Replicator.UpdateTimeout` reply does not mean that the u
or was rolled back. It may still have been replicated to some nodes, and will eventually or was rolled back. It may still have been replicated to some nodes, and will eventually
be replicated to all nodes with the gossip protocol. be replicated to all nodes with the gossip protocol.
@@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #update-response1 } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #update-response1 }
@@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #update-response2 } Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #update-response1 }
Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #update-response2 }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #update-response2 }
You will always see your own writes. For example if you send two `Update` messages You will always see your own writes. For example if you send two `Update` messages
changing the value of the same `key`, the `modify` function of the second message will changing the value of the same `key`, the `modify` function of the second message will
@ -105,7 +122,11 @@ does not care about, but is included in the reply messages. This is a convenient
way to pass contextual information (e.g. original sender) without having to use `ask` way to pass contextual information (e.g. original sender) without having to use `ask`
or maintain local correlation data structures. or maintain local correlation data structures.
@@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #update-request-context } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #update-request-context }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #update-request-context }
<a id="replicator-get"></a> <a id="replicator-get"></a>
### Get ### Get
@ -113,7 +134,7 @@ or maintain local correlation data structures.
To retrieve the current value of a data you send `Replicator.Get` message to the To retrieve the current value of a data you send `Replicator.Get` message to the
`Replicator`. You supply a consistency level which has the following meaning: `Replicator`. You supply a consistency level which has the following meaning:
* `ReadLocal` the value will only be read from the local replica * @scala[`ReadLocal`]@java[`readLocal`] the value will only be read from the local replica
* `ReadFrom(n)` the value will be read and merged from `n` replicas, * `ReadFrom(n)` the value will be read and merged from `n` replicas,
including the local replica including the local replica
* `ReadMajority` the value will be read and merged from a majority of replicas, i.e. * `ReadMajority` the value will be read and merged from a majority of replicas, i.e.
@ -124,16 +145,29 @@ at least **N/2 + 1** replicas, where N is the number of nodes in the cluster
Note that `ReadMajority` has a `minCap` parameter that is useful to specify to achieve better safety for small clusters. Note that `ReadMajority` has a `minCap` parameter that is useful to specify to achieve better safety for small clusters.
@@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #get } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #get }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #get }
As reply of the `Get` a `Replicator.GetSuccess` is sent to the sender of the As reply of the `Get` a `Replicator.GetSuccess` is sent to the sender of the
`Get` if the value was successfully retrieved according to the supplied consistency `Get` if the value was successfully retrieved according to the supplied consistency
level within the supplied timeout. Otherwise a `Replicator.GetFailure` is sent. level within the supplied timeout. Otherwise a `Replicator.GetFailure` is sent.
If the key does not exist the reply will be `Replicator.NotFound`. If the key does not exist the reply will be `Replicator.NotFound`.
@@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #get-response1 } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #get-response1 }
@@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #get-response2 } Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #get-response1 }
Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #get-response2 }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #get-response2 }
You will always read your own writes. For example if you send a `Update` message You will always read your own writes. For example if you send a `Update` message
followed by a `Get` of the same `key` the `Get` will retrieve the change that was followed by a `Get` of the same `key` the `Get` will retrieve the change that was
@ -145,17 +179,21 @@ In the `Get` message you can pass an optional request context in the same way as
`Update` message, described above. For example the original sender can be passed and replied `Update` message, described above. For example the original sender can be passed and replied
to after receiving and transforming `GetSuccess`. to after receiving and transforming `GetSuccess`.
@@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #get-request-context } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #get-request-context }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #get-request-context }
### Consistency ### Consistency
The consistency level that is supplied in the [Update](#replicator-update) and [Get](#replicator-get) The consistency level that is supplied in the [Update](#replicator-update) and [Get](#replicator-get)
specifies per request how many replicas that must respond successfully to a write and read request. specifies per request how many replicas that must respond successfully to a write and read request.
For low latency reads you use `ReadLocal` with the risk of retrieving stale data, i.e. updates For low latency reads you use @scala[`ReadLocal`]@java[`readLocal`] with the risk of retrieving stale data, i.e. updates
from other nodes might not be visible yet. from other nodes might not be visible yet.
When using `WriteLocal` the update is only written to the local replica and then disseminated When using @scala[`WriteLocal`]@java[`writeLocal`] the update is only written to the local replica and then disseminated
in the background with the gossip protocol, which can take few seconds to spread to all nodes. in the background with the gossip protocol, which can take few seconds to spread to all nodes.
`WriteAll` and `ReadAll` is the strongest consistency level, but also the slowest and with `WriteAll` and `ReadAll` is the strongest consistency level, but also the slowest and with
@ -197,11 +235,25 @@ the total size of the cluster.
Here is an example of using `WriteMajority` and `ReadMajority`: Here is an example of using `WriteMajority` and `ReadMajority`:
@@snip [ShoppingCart.scala]($code$/scala/docs/ddata/ShoppingCart.scala) { #read-write-majority } Scala
: @@snip [ShoppingCart.scala]($code$/scala/docs/ddata/ShoppingCart.scala) { #read-write-majority }
@@snip [ShoppingCart.scala]($code$/scala/docs/ddata/ShoppingCart.scala) { #get-cart } Java
: @@snip [ShoppingCart.java]($code$/java/jdocs/ddata/ShoppingCart.java) { #read-write-majority }
@@snip [ShoppingCart.scala]($code$/scala/docs/ddata/ShoppingCart.scala) { #add-item }
Scala
: @@snip [ShoppingCart.scala]($code$/scala/docs/ddata/ShoppingCart.scala) { #get-cart }
Java
: @@snip [ShoppingCart.java]($code$/java/jdocs/ddata/ShoppingCart.java) { #get-cart }
Scala
: @@snip [ShoppingCart.scala]($code$/scala/docs/ddata/ShoppingCart.scala) { #add-item }
Java
: @@snip [ShoppingCart.java]($code$/java/jdocs/ddata/ShoppingCart.java) { #add-item }
In some rare cases, when performing an `Update` it is needed to first try to fetch latest data from In some rare cases, when performing an `Update` it is needed to first try to fetch latest data from
other nodes. That can be done by first sending a `Get` with `ReadMajority` and then continue with other nodes. That can be done by first sending a `Get` with `ReadMajority` and then continue with
@ -213,7 +265,11 @@ performed (hence the name observed-removed set).
The following example illustrates how to do that: The following example illustrates how to do that:
@@snip [ShoppingCart.scala]($code$/scala/docs/ddata/ShoppingCart.scala) { #remove-item } Scala
: @@snip [ShoppingCart.scala]($code$/scala/docs/ddata/ShoppingCart.scala) { #remove-item }
Java
: @@snip [ShoppingCart.java]($code$/java/jdocs/ddata/ShoppingCart.java) { #remove-item }
@@@ warning @@@ warning
@ -238,7 +294,11 @@ immediately.
The subscriber is automatically removed if the subscriber is terminated. A subscriber can The subscriber is automatically removed if the subscriber is terminated. A subscriber can
also be deregistered with the `Replicator.Unsubscribe` message. also be deregistered with the `Replicator.Unsubscribe` message.
@@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #subscribe } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #subscribe }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #subscribe }
### Delete ### Delete
@ -259,7 +319,11 @@ In the *Delete* message you can pass an optional request context in the same way
*Update* message, described above. For example the original sender can be passed and replied *Update* message, described above. For example the original sender can be passed and replied
to after receiving and transforming *DeleteSuccess*. to after receiving and transforming *DeleteSuccess*.
@@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #delete } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #delete }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #delete }
@@@ warning @@@ warning
@ -300,10 +364,10 @@ akka.cluster.distributed-data.delta-crdt.enabled=off
## Data Types ## Data Types
The data types must be convergent (stateful) CRDTs and implement the `ReplicatedData` trait, The data types must be convergent (stateful) CRDTs and implement the @scala[`ReplicatedData` trait]@java[`AbstractReplicatedData` interface],
i.e. they provide a monotonic merge function and the state changes always converge. i.e. they provide a monotonic merge function and the state changes always converge.
You can use your own custom `ReplicatedData` or `DeltaReplicatedData` types, and several types are provided You can use your own custom @scala[`ReplicatedData` or `DeltaReplicatedData`]@java[`AbstractReplicatedData` or `AbstractDeltaReplicatedData`] types, and several types are provided
by this package, such as: by this package, such as:
* Counters: `GCounter`, `PNCounter` * Counters: `GCounter`, `PNCounter`
@ -325,7 +389,11 @@ It is tracking the increments (P) separate from the decrements (N). Both P and N
as two internal `GCounter`. Merge is handled by merging the internal P and N counters. as two internal `GCounter`. Merge is handled by merging the internal P and N counters.
The value of the counter is the value of the P counter minus the value of the N counter. The value of the counter is the value of the P counter minus the value of the N counter.
@@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #pncounter } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #pncounter }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #pncounter }
`GCounter` and `PNCounter` have support for [delta-CRDT](#delta-crdt) and don't need causal `GCounter` and `PNCounter` have support for [delta-CRDT](#delta-crdt) and don't need causal
delivery of deltas. delivery of deltas.
@ -335,7 +403,11 @@ When the counters are placed in a `PNCounterMap` as opposed to placing them as s
values they are guaranteed to be replicated together as one unit, which is sometimes necessary for values they are guaranteed to be replicated together as one unit, which is sometimes necessary for
related data. related data.
@@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #pncountermap } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #pncountermap }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #pncountermap }
### Sets ### Sets
@ -343,7 +415,11 @@ If you only need to add elements to a set and not remove elements the `GSet` (gr
the data type to use. The elements can be any type of values that can be serialized. the data type to use. The elements can be any type of values that can be serialized.
Merge is simply the union of the two sets. Merge is simply the union of the two sets.
@@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #gset } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #gset }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #gset }
`GSet` has support for [delta-CRDT](#delta-crdt) and it doesn't require causal delivery of deltas. `GSet` has support for [delta-CRDT](#delta-crdt) and it doesn't require causal delivery of deltas.
@ -356,7 +432,11 @@ The version for the node that added the element is also tracked for each element
called "birth dot". The version vector and the dots are used by the `merge` function to called "birth dot". The version vector and the dots are used by the `merge` function to
track causality of the operations and resolve concurrent updates. track causality of the operations and resolve concurrent updates.
@@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #orset } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #orset }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #orset }
`ORSet` has support for [delta-CRDT](#delta-crdt) and it requires causal delivery of deltas. `ORSet` has support for [delta-CRDT](#delta-crdt) and it requires causal delivery of deltas.
@ -400,7 +480,11 @@ There is ongoing work aimed at removing necessity of creation of the aforementio
that despite having the same Scala type, `ORMultiMap.emptyWithValueDeltas` is not compatible with 'vanilla' `ORMultiMap`, that despite having the same Scala type, `ORMultiMap.emptyWithValueDeltas` is not compatible with 'vanilla' `ORMultiMap`,
because of different replication mechanism. because of different replication mechanism.
@@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #ormultimap } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #ormultimap }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #ormultimap }
When a data entry is changed the full state of that entry is replicated to other nodes, i.e. When a data entry is changed the full state of that entry is replicated to other nodes, i.e.
when you update a map the whole map is replicated. Therefore, instead of using one `ORMap` when you update a map the whole map is replicated. Therefore, instead of using one `ORMap`
@ -419,7 +503,11 @@ in the below section about `LWWRegister`.
`Flag` is a data type for a boolean value that is initialized to `false` and can be switched `Flag` is a data type for a boolean value that is initialized to `false` and can be switched
to `true`. Thereafter it cannot be changed. `true` wins over `false` in merge. to `true`. Thereafter it cannot be changed. `true` wins over `false` in merge.
@@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #flag } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #flag }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #flag }
`LWWRegister` (last writer wins register) can hold any (serializable) value. `LWWRegister` (last writer wins register) can hold any (serializable) value.
@ -430,13 +518,21 @@ value is not important for concurrent updates occurring within the clock skew.
Merge takes the register updated by the node with lowest address (`UniqueAddress` is ordered) Merge takes the register updated by the node with lowest address (`UniqueAddress` is ordered)
if the timestamps are exactly the same. if the timestamps are exactly the same.
@@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #lwwregister } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #lwwregister }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #lwwregister }
Instead of using timestamps based on `System.currentTimeMillis()` time it is possible to Instead of using timestamps based on `System.currentTimeMillis()` time it is possible to
use a timestamp value based on something else, for example an increasing version number use a timestamp value based on something else, for example an increasing version number
from a database record that is used for optimistic concurrency control. from a database record that is used for optimistic concurrency control.
@@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #lwwregister-custom-clock } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #lwwregister-custom-clock }
Java
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #lwwregister-custom-clock }
For first-write-wins semantics you can use the `LWWRegister#reverseClock` instead of the For first-write-wins semantics you can use the `LWWRegister#reverseClock` instead of the
`LWWRegister#defaultClock`. `LWWRegister#defaultClock`.
@ -451,7 +547,7 @@ changing and writing the value with `WriteMajority` (or more).
### Custom Data Type ### Custom Data Type
You can rather easily implement your own data types. The only requirement is that it implements You can rather easily implement your own data types. The only requirement is that it implements
the `merge` function of the `ReplicatedData` trait. the @scala[`merge`]@java[`mergeData`] function of the @scala[`ReplicatedData`]@java[`AbstractReplicatedData`] trait.
A nice property of stateful CRDTs is that they typically compose nicely, i.e. you can combine several A nice property of stateful CRDTs is that they typically compose nicely, i.e. you can combine several
smaller data types to build richer data structures. For example, the `PNCounter` is composed of smaller data types to build richer data structures. For example, the `PNCounter` is composed of
@ -461,11 +557,15 @@ Here is s simple implementation of a custom `TwoPhaseSet` that is using two inte
to keep track of addition and removals. A `TwoPhaseSet` is a set where an element may be added and to keep track of addition and removals. A `TwoPhaseSet` is a set where an element may be added and
removed, but never added again thereafter. removed, but never added again thereafter.
@@snip [TwoPhaseSet.scala]($code$/scala/docs/ddata/TwoPhaseSet.scala) { #twophaseset } Scala
: @@snip [TwoPhaseSet.scala]($code$/scala/docs/ddata/TwoPhaseSet.scala) { #twophaseset }
Java
: @@snip [TwoPhaseSet.java]($code$/java/jdocs/ddata/TwoPhaseSet.java) { #twophaseset }
Data types should be immutable, i.e. "modifying" methods should return a new instance. Data types should be immutable, i.e. "modifying" methods should return a new instance.
Implement the additional methods of `DeltaReplicatedData` if it has support for delta-CRDT replication. Implement the additional methods of @scala[`DeltaReplicatedData`]@java[`AbstractDeltaReplicatedData`] if it has support for delta-CRDT replication.
#### Serialization #### Serialization
@ -485,19 +585,31 @@ This is a protobuf representation of the above `TwoPhaseSet`:
The serializer for the `TwoPhaseSet`: The serializer for the `TwoPhaseSet`:
@@snip [TwoPhaseSetSerializer.scala]($code$/scala/docs/ddata/protobuf/TwoPhaseSetSerializer.scala) { #serializer } Scala
: @@snip [TwoPhaseSetSerializer.scala]($code$/scala/docs/ddata/protobuf/TwoPhaseSetSerializer.scala) { #serializer }
Java
: @@snip [TwoPhaseSetSerializer.java]($code$/java/jdocs/ddata/protobuf/TwoPhaseSetSerializer.java) { #serializer }
Note that the elements of the sets are sorted so the SHA-1 digests are the same Note that the elements of the sets are sorted so the SHA-1 digests are the same
for the same elements. for the same elements.
You register the serializer in configuration: You register the serializer in configuration:
@@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #serializer-config } Scala
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #serializer-config }
Java
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #japi-serializer-config }
Using compression can sometimes be a good idea to reduce the data size. Gzip compression is Using compression can sometimes be a good idea to reduce the data size. Gzip compression is
provided by the `akka.cluster.ddata.protobuf.SerializationSupport` trait: provided by the @scala[`akka.cluster.ddata.protobuf.SerializationSupport` trait]@java[`akka.cluster.ddata.protobuf.AbstractSerializationSupport` interface]:
@@snip [TwoPhaseSetSerializer.scala]($code$/scala/docs/ddata/protobuf/TwoPhaseSetSerializer.scala) { #compression } Scala
: @@snip [TwoPhaseSetSerializer.scala]($code$/scala/docs/ddata/protobuf/TwoPhaseSetSerializer.scala) { #compression }
Java
: @@snip [TwoPhaseSetSerializerWithCompression.java]($code$/java/jdocs/ddata/protobuf/TwoPhaseSetSerializerWithCompression.java) { #compression }
The two embedded `GSet` can be serialized as illustrated above, but in general when composing The two embedded `GSet` can be serialized as illustrated above, but in general when composing
new data types from the existing built in types it is better to make use of the existing new data types from the existing built in types it is better to make use of the existing
@ -510,7 +622,11 @@ by the `SerializationSupport` trait to serialize and deserialize the `GSet` inst
works with any type that has a registered Akka serializer. This is how such an serializer would works with any type that has a registered Akka serializer. This is how such an serializer would
look like for the `TwoPhaseSet`: look like for the `TwoPhaseSet`:
@@snip [TwoPhaseSetSerializer2.scala]($code$/scala/docs/ddata/protobuf/TwoPhaseSetSerializer2.scala) { #serializer } Scala
: @@snip [TwoPhaseSetSerializer2.scala]($code$/scala/docs/ddata/protobuf/TwoPhaseSetSerializer2.scala) { #serializer }
Java
: @@snip [TwoPhaseSetSerializer2.java]($code$/java/jdocs/ddata/protobuf/TwoPhaseSetSerializer2.java) { #serializer }
<a id="ddata-durable"></a> <a id="ddata-durable"></a>
### Durable Storage ### Durable Storage
@ -536,14 +652,15 @@ All entries can be made durable by specifying:
akka.cluster.distributed-data.durable.keys = ["*"] akka.cluster.distributed-data.durable.keys = ["*"]
``` ```
[LMDB](https://symas.com/products/lightning-memory-mapped-database/) is the default storage implementation. It is @scala[[LMDB](https://symas.com/products/lightning-memory-mapped-database/)]@java[[LMDB](https://github.com/lmdbjava/lmdbjava/)] is the default storage implementation. It is
possible to replace that with another implementation by implementing the actor protocol described in possible to replace that with another implementation by implementing the actor protocol described in
`akka.cluster.ddata.DurableStore` and defining the `akka.cluster.distributed-data.durable.store-actor-class` `akka.cluster.ddata.DurableStore` and defining the `akka.cluster.distributed-data.durable.store-actor-class`
property for the new implementation. property for the new implementation.
The location of the files for the data is configured with: The location of the files for the data is configured with:
``` Scala
: ```
# Directory of LMDB file. There are two options: # Directory of LMDB file. There are two options:
# 1. A relative or absolute path to a directory that ends with 'ddata' # 1. A relative or absolute path to a directory that ends with 'ddata'
# the full name of the directory will contain name of the ActorSystem # the full name of the directory will contain name of the ActorSystem
@ -553,6 +670,18 @@ The location of the files for the data is configured with:
akka.cluster.distributed-data.durable.lmdb.dir = "ddata" akka.cluster.distributed-data.durable.lmdb.dir = "ddata"
``` ```
Java
: ```
# Directory of LMDB file. There are two options:
# 1. A relative or absolute path to a directory that ends with 'ddata'
# the full name of the directory will contain name of the ActorSystem
# and its remote port.
# 2. Otherwise the path is used as is, as a relative or absolute path to
# a directory.
akka.cluster.distributed-data.durable.lmdb.dir = "ddata"
```
When running in production you may want to configure the directory to a specific When running in production you may want to configure the directory to a specific
path (alt 2), since the default directory contains the remote port of the path (alt 2), since the default directory contains the remote port of the
actor system to make the name unique. If using a dynamically assigned actor system to make the name unique. If using a dynamically assigned
@ -599,7 +728,7 @@ API documentation of the `Replicator` for details.
## Samples ## Samples
Several interesting samples are included and described in the Several interesting samples are included and described in the
tutorial named @extref[Akka Distributed Data Samples with Scala](ecs:akka-samples-distributed-data-scala) (@extref[source code](samples:akka-sample-distributed-data-scala)) tutorial named @scala[@extref[Akka Distributed Data Samples with Scala](ecs:akka-samples-distributed-data-scala) (@extref[source code](samples:akka-sample-distributed-data-scala))]@java[@extref[Akka Distributed Data Samples with Java](ecs:akka-samples-distributed-data-java) (@extref[source code](samples:akka-sample-distributed-data-java))]
* Low Latency Voting Service * Low Latency Voting Service
* Highly Available Shopping Cart * Highly Available Shopping Cart
@ -650,7 +779,7 @@ sbt
"com.typesafe.akka" %% "akka-distributed-data" % "$akka.version$" "com.typesafe.akka" %% "akka-distributed-data" % "$akka.version$"
``` ```
@@@ @@@
Maven Maven
: @@@vars : @@@vars
``` ```
@ -666,4 +795,4 @@ Maven
The `DistributedData` extension can be configured with the following properties: The `DistributedData` extension can be configured with the following properties:
@@snip [reference.conf]($akka$/akka-distributed-data/src/main/resources/reference.conf) { #distributed-data } @@snip [reference.conf]($akka$/akka-distributed-data/src/main/resources/reference.conf) { #distributed-data }