Minor adjustments to replicated es docs (#29450)

* Minor adjustments to replicated es docs

* and a few aa leftovers

* link to more examples
This commit is contained in:
Patrik Nordwall 2020-08-04 08:12:45 +02:00 committed by Christopher Batey
parent 9fb76bbea4
commit cd821fe3f8
16 changed files with 151 additions and 127 deletions

View file

@ -29,6 +29,7 @@ import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite; import org.scalatestplus.junit.JUnitSuite;
import java.util.*; import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -144,7 +145,7 @@ public class ReplicatedShardingTest extends JUnitSuite {
private final ReplicatedSharding< private final ReplicatedSharding<
MyReplicatedStringSet.Command, ShardingEnvelope<MyReplicatedStringSet.Command>> MyReplicatedStringSet.Command, ShardingEnvelope<MyReplicatedStringSet.Command>>
aaSharding; replicatedSharding;
private ProxyActor(ActorContext<Command> context) { private ProxyActor(ActorContext<Command> context) {
super(context); super(context);
@ -152,7 +153,7 @@ public class ReplicatedShardingTest extends JUnitSuite {
// #bootstrap // #bootstrap
ReplicatedShardingSettings< ReplicatedShardingSettings<
MyReplicatedStringSet.Command, ShardingEnvelope<MyReplicatedStringSet.Command>> MyReplicatedStringSet.Command, ShardingEnvelope<MyReplicatedStringSet.Command>>
aaShardingSettings = replicatedShardingSettings =
ReplicatedShardingSettings.create( ReplicatedShardingSettings.create(
MyReplicatedStringSet.Command.class, MyReplicatedStringSet.Command.class,
ALL_REPLICAS, ALL_REPLICAS,
@ -176,8 +177,12 @@ public class ReplicatedShardingTest extends JUnitSuite {
ReplicatedShardingExtension extension = ReplicatedShardingExtension extension =
ReplicatedShardingExtension.get(getContext().getSystem()); ReplicatedShardingExtension.get(getContext().getSystem());
aaSharding = extension.init(aaShardingSettings); ReplicatedSharding<
MyReplicatedStringSet.Command, ShardingEnvelope<MyReplicatedStringSet.Command>>
replicatedSharding = extension.init(replicatedShardingSettings);
// #bootstrap // #bootstrap
this.replicatedSharding = replicatedSharding;
} }
@Override @Override
@ -190,8 +195,8 @@ public class ReplicatedShardingTest extends JUnitSuite {
private Behavior<Command> onForwardToRandom(ForwardToRandom forwardToRandom) { private Behavior<Command> onForwardToRandom(ForwardToRandom forwardToRandom) {
Map<ReplicaId, EntityRef<MyReplicatedStringSet.Command>> refs = Map<ReplicaId, EntityRef<MyReplicatedStringSet.Command>> refs =
aaSharding.getEntityRefsFor(forwardToRandom.entityId); replicatedSharding.getEntityRefsFor(forwardToRandom.entityId);
int chosenIdx = new java.util.Random().nextInt(refs.size()); int chosenIdx = ThreadLocalRandom.current().nextInt(refs.size());
new ArrayList<>(refs.values()).get(chosenIdx).tell(forwardToRandom.message); new ArrayList<>(refs.values()).get(chosenIdx).tell(forwardToRandom.message);
return this; return this;
} }
@ -199,7 +204,7 @@ public class ReplicatedShardingTest extends JUnitSuite {
private Behavior<Command> onForwardToAll(ForwardToAll forwardToAll) { private Behavior<Command> onForwardToAll(ForwardToAll forwardToAll) {
// #all-entity-refs // #all-entity-refs
Map<ReplicaId, EntityRef<MyReplicatedStringSet.Command>> refs = Map<ReplicaId, EntityRef<MyReplicatedStringSet.Command>> refs =
aaSharding.getEntityRefsFor(forwardToAll.entityId); replicatedSharding.getEntityRefsFor(forwardToAll.entityId);
refs.forEach((replicaId, ref) -> ref.tell(forwardToAll.message)); refs.forEach((replicaId, ref) -> ref.tell(forwardToAll.message));
// #all-entity-refs // #all-entity-refs
return this; return this;

View file

@ -4,6 +4,8 @@
package akka.cluster.sharding.typed package akka.cluster.sharding.typed
import java.util.concurrent.ThreadLocalRandom
import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
@ -23,8 +25,6 @@ import akka.serialization.jackson.CborSerializable
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike import org.scalatest.wordspec.AnyWordSpecLike
import scala.util.Random
object ReplicatedShardingSpec { object ReplicatedShardingSpec {
def config = ConfigFactory.parseString(""" def config = ConfigFactory.parseString("""
akka.loglevel = DEBUG akka.loglevel = DEBUG
@ -52,9 +52,9 @@ class ReplicatedShardingSpec
entityId, entityId,
replicaId, replicaId,
allReplicas, allReplicas,
PersistenceTestKitReadJournal.Identifier) { aaContext => PersistenceTestKitReadJournal.Identifier) { replicationContext =>
EventSourcedBehavior[Command, String, Set[String]]( EventSourcedBehavior[Command, String, Set[String]](
aaContext.persistenceId, replicationContext.persistenceId,
Set.empty[String], Set.empty[String],
(state, command) => (state, command) =>
command match { command match {
@ -75,7 +75,7 @@ class ReplicatedShardingSpec
def apply(): Behavior[Command] = Behaviors.setup { context => def apply(): Behavior[Command] = Behaviors.setup { context =>
// #bootstrap // #bootstrap
val aaShardingSettings = val replicatedShardingSettings =
ReplicatedShardingSettings[MyReplicatedStringSet.Command, ShardingEnvelope[MyReplicatedStringSet.Command]]( ReplicatedShardingSettings[MyReplicatedStringSet.Command, ShardingEnvelope[MyReplicatedStringSet.Command]](
// all replicas // all replicas
Set(ReplicaId("DC-A"), ReplicaId("DC-B"), ReplicaId("DC-C"))) { (entityTypeKey, replicaId, allReplicaIds) => Set(ReplicaId("DC-A"), ReplicaId("DC-B"), ReplicaId("DC-C"))) { (entityTypeKey, replicaId, allReplicaIds) =>
@ -93,20 +93,20 @@ class ReplicatedShardingSpec
.withRole(replicaId.id)) .withRole(replicaId.id))
} }
val aaSharding = ReplicatedShardingExtension(context.system).init(aaShardingSettings) val replicatedSharding = ReplicatedShardingExtension(context.system).init(replicatedShardingSettings)
// #bootstrap // #bootstrap
Behaviors.receiveMessage { Behaviors.receiveMessage {
case ForwardToAll(entityId, cmd) => case ForwardToAll(entityId, cmd) =>
// #all-entity-refs // #all-entity-refs
aaSharding.entityRefsFor(entityId).foreach { replicatedSharding.entityRefsFor(entityId).foreach {
case (_, ref) => ref ! cmd case (_, ref) => ref ! cmd
} }
// #all-entity-refs // #all-entity-refs
Behaviors.same Behaviors.same
case ForwardToRandom(entityId, cmd) => case ForwardToRandom(entityId, cmd) =>
val refs = aaSharding.entityRefsFor(entityId) val refs = replicatedSharding.entityRefsFor(entityId)
val chosenIdx = (new Random()).nextInt(refs.size) val chosenIdx = ThreadLocalRandom.current().nextInt(refs.size)
refs.values.toIndexedSeq(chosenIdx) ! cmd; refs.values.toIndexedSeq(chosenIdx) ! cmd;
Behaviors.same Behaviors.same
} }

View file

@ -9,7 +9,7 @@ project.description: Event Sourcing with Akka Persistence enables actors to pers
@@@ index @@@ index
* [persistence](persistence.md) * [persistence](persistence.md)
* [active-active](replicated-eventsourcing.md) * [replicated-eventsourcing](replicated-eventsourcing.md)
* [cqrs](cqrs.md) * [cqrs](cqrs.md)
* [persistence-style](persistence-style.md) * [persistence-style](persistence-style.md)
* [persistence-snapshot](persistence-snapshot.md) * [persistence-snapshot](persistence-snapshot.md)
@ -20,6 +20,6 @@ project.description: Event Sourcing with Akka Persistence enables actors to pers
* [persistence-query-leveldb](../persistence-query-leveldb.md) * [persistence-query-leveldb](../persistence-query-leveldb.md)
* [persistence-plugins](../persistence-plugins.md) * [persistence-plugins](../persistence-plugins.md)
* [persistence-journals](../persistence-journals.md) * [persistence-journals](../persistence-journals.md)
* [active-active-examples](replicated-eventsourcing-examples.md) * [replicated-eventsourcing-examples](replicated-eventsourcing-examples.md)
@@@ @@@

View file

@ -10,8 +10,8 @@ warning or deprecation period. It is also not recommended to use this module in
@ref[Event sourcing](./persistence.md) with `EventSourcedBehavior`s is based on the single writer principle, which means that there can only be one active instance of a `EventSourcedBehavior` with a given `persistenceId`. Otherwise, multiple instances would store interleaving events based on different states, and when these events would later be replayed it would not be possible to reconstruct the correct state. @ref[Event sourcing](./persistence.md) with `EventSourcedBehavior`s is based on the single writer principle, which means that there can only be one active instance of a `EventSourcedBehavior` with a given `persistenceId`. Otherwise, multiple instances would store interleaving events based on different states, and when these events would later be replayed it would not be possible to reconstruct the correct state.
This restriction means that in the event of network partitions, and for a short time during rolling re-deploys, `EventSourcedBehaviors`s are unavailable. This restriction means that in the event of network partitions, and for a short time during rolling re-deploys, some
`EventSourcedBehavior` actors are unavailable.
Replicated Event Sourcing enables running multiple replicas of each entity. Replicated Event Sourcing enables running multiple replicas of each entity.
There is automatic replication of every event persisted to all replicas. There is automatic replication of every event persisted to all replicas.
@ -31,15 +31,16 @@ However, the event handler must be able to **handle concurrent events** as when
there is no longer the single writer principle as there is with a normal `EventSourcedBehavior`. there is no longer the single writer principle as there is with a normal `EventSourcedBehavior`.
The state of a replicated `EventSourcedBehavior` is **eventually consistent**. Event replication may be delayed The state of a replicated `EventSourcedBehavior` is **eventually consistent**. Event replication may be delayed
due to network partitions and outages and the event handler and those reading the state must be designed to handle this. due to network partitions and outages, which means that the event handler and those reading the state must be designed
to handle this.
To be able to use Replicated Event Sourcing the journal and snapshot store used is required to have specific support for the metadata that the replication needs (see @ref[Journal Support](#journal-support)) To be able to use Replicated Event Sourcing the journal and snapshot store used is required to have specific support for the metadata that the replication needs (see @ref[Journal Support](#journal-support)).
## Relaxing the single writer principle for availability ## Relaxing the single writer principle for availability
Taking the example of using Replicated Event Sourcing to run a replica per data center. Taking the example of using Replicated Event Sourcing to run a replica per data center.
When there is no network partitions and no concurrent writes the events stored by a `EventSourcedBehavior` at one replica can be replicated and consumed by another (corresponding) replica in another data center without any concerns. Such replicated events can simply be applied to the local state. When there is no network partitions and no concurrent writes the events stored by an `EventSourcedBehavior` at one replica can be replicated and consumed by another (corresponding) replica in another data center without any concerns. Such replicated events can simply be applied to the local state.
![images/replicated-events1.png](images/replicated-events1.png) ![images/replicated-events1.png](images/replicated-events1.png)
@ -51,14 +52,14 @@ The event handler logic for applying events to the state of the entity must be a
For example, sometimes it's enough to use application specific timestamps to decide which update should win. For example, sometimes it's enough to use application specific timestamps to decide which update should win.
To assist in implementing the event handler active-active detects these conflicts. To assist in implementing the event handler the Replicated Event Sourcing detects these conflicts.
## API ## API
@scala[The same API as regular `EventSourcedBehavior`s]@java[A very similar API to the regular `EventSourcedBehavior`] is used to define the logic. @scala[The same API as regular `EventSourcedBehavior`s]@java[A very similar API to the regular `EventSourcedBehavior`] is used to define the logic.
To enable an entity for active-active To enable an entity for Replicated Event Sourcing
replication @java[let it extend `ReplicatedEventSourcedBehavior` instead of `EventSourcedBehavior` and] use the factory methods on @scala[`akka.persistence.typed.scaladsl.ReplicatedEventSourcing`]@java[`akka.persistence.typed.javadsl.ReplicatedEventSourcing`]. @java[let it extend `ReplicatedEventSourcedBehavior` instead of `EventSourcedBehavior` and] use the factory methods on @scala[`akka.persistence.typed.scaladsl.ReplicatedEventSourcing`]@java[`akka.persistence.typed.javadsl.ReplicatedEventSourcing`].
All replicas need to be known up front: All replicas need to be known up front:
@ -79,9 +80,9 @@ Java
The factory takes in: The factory takes in:
* EntityID: this will be used as part of the underlying persistenceId * `entityId`: this will be used as part of the underlying persistenceId
* Replica: Which replica this instance is * `replicaId`: Which replica this instance is
* All Replicas and the query plugin used to read their events * `allReplicasAndQueryPlugins`: All Replicas and the query plugin used to read their events
* A factory function to create an instance of the @scala[`EventSourcedBehavior`]@java[`ReplicatedEventSourcedBehavior`] * A factory function to create an instance of the @scala[`EventSourcedBehavior`]@java[`ReplicatedEventSourcedBehavior`]
In this scenario each replica reads from each other's database effectively providing cross region replication for any database that has an Akka Persistence plugin. Alternatively if all the replicas use the same journal, e.g. for testing or if it is a distributed database such as Cassandra, the `withSharedJournal` factory can be used. In this scenario each replica reads from each other's database effectively providing cross region replication for any database that has an Akka Persistence plugin. Alternatively if all the replicas use the same journal, e.g. for testing or if it is a distributed database such as Cassandra, the `withSharedJournal` factory can be used.
@ -97,10 +98,10 @@ Java
The function passed to both factory methods return an `EventSourcedBehavior` and provide access to the @apidoc[ReplicationContext] that has the following methods: The function passed to both factory methods return an `EventSourcedBehavior` and provide access to the @apidoc[ReplicationContext] that has the following methods:
* entityId * `entityId`
* replicaId * `replicaId`
* allReplicas * `allReplicas`
* persistenceId - to provide to the `EventSourcedBehavior` factory. This **must be used**. * `persistenceId` - to provide to the `EventSourcedBehavior` factory. This **must be used**.
As well as methods that **can only be** used in the event handler. The values these methods return relate to the event that is being processed. As well as methods that **can only be** used in the event handler. The values these methods return relate to the event that is being processed.
@ -113,19 +114,19 @@ concrete `ReplicatedEventSourcedBehavior` and on to the super constructor.
The context gives access to: The context gives access to:
* entityId * `entityId`
* replicaId * `replicaId`
* allReplicas * `allReplicas`
* persistenceId * `persistenceId`
As well as methods that **can only be** used in the event handler, accessed through `getReplicationContext`. The values these methods return relate to the event that is being processed. As well as methods that **can only be** used in the event handler, accessed through `getReplicationContext`. The values these methods return relate to the event that is being processed.
@@@ @@@
* origin: The ReplicaId that originally created the event * `origin`: The ReplicaId that originally created the event
* concurrent: Whether the event was concurrent with another event as in the second diagram above * `concurrent`: Whether the event was concurrent with another event as in the second diagram above
* recoveryRunning: Whether a recovery is running. Can be used to send commands back to self for side effects that should only happen once. * `recoveryRunning`: Whether a recovery is running. Can be used to send commands back to self for side effects that should only happen once.
* currentTimeMillis: similar to `System.currentTimeMillis` but guaranteed never to go backwards * `currentTimeMillis`: similar to `System.currentTimeMillis` but guaranteed never to go backwards
The factory returns a `Behavior` that can be spawned like any other behavior. The factory returns a `Behavior` that can be spawned like any other behavior.
@ -146,11 +147,11 @@ Sometimes it is enough to use timestamps to decide which update should win. Such
There is a small utility class @apidoc[LwwTime] that can be useful for implementing last writer wins semantics. There is a small utility class @apidoc[LwwTime] that can be useful for implementing last writer wins semantics.
It contains a timestamp representing current time when the event was persisted and an identifier of the It contains a timestamp representing current time when the event was persisted and an identifier of the
replica that persisted it. When comparing two @apidoc[LwwTime] the greatest timestamp wins. The replica replica that persisted it. When comparing two @apidoc[LwwTime] the greatest timestamp wins. The replica
identifier is used if the two timestamps are equal, and then the one from the data center sorted first in identifier is used if the two timestamps are equal, and then the one from the `replicaId` sorted first in
alphanumeric order wins. alphanumeric order wins.
The nature of last writer wins means that if you only have one timestamp for the state the events must represent an The nature of last writer wins means that if you only have one timestamp for the state the events must represent an
update of the full state. Otherwise, there is a risk that the state in different data centers will be different and update of the full state. Otherwise, there is a risk that the state in different replicas will be different and
not eventually converge. not eventually converge.
An example of that would be an entity representing a blog post and the fields `author` and `title` could be updated An example of that would be an entity representing a blog post and the fields `author` and `title` could be updated
@ -201,7 +202,7 @@ You dont have to read this section to be able to use the feature, but to use
### Causal deliver order ### Causal deliver order
Causal delivery order means that events persisted in one data center are read in the same order in other data centers. The order of concurrent events is undefined, which should be no problem Causal delivery order means that events persisted in one replica are read in the same order in other replicas. The order of concurrent events is undefined, which should be no problem
when using [CRDT's](#conflict-free-replicated-data-types) when using [CRDT's](#conflict-free-replicated-data-types)
and otherwise will be detected via the `ReplicationContext` concurrent method. and otherwise will be detected via the `ReplicationContext` concurrent method.
@ -213,7 +214,7 @@ DC-2: read e1, write e2
DC-1: read e2, write e3 DC-1: read e2, write e3
``` ```
In the above example the causality is `e1 -> e2 -> e3`. Also in a third data center DC-3 these events will be read in the same order e1, e2, e3. In the above example the causality is `e1 -> e2 -> e3`. Also in a third replica DC-3 these events will be read in the same order e1, e2, e3.
Another example with concurrent events: Another example with concurrent events:
@ -227,7 +228,7 @@ DC2: read e3
e2 and e3 are concurrent, i.e. they don't have a causal relation: DC1 sees them in the order "e1, e3, e2", while DC2 sees them as "e1, e2, e3". e2 and e3 are concurrent, i.e. they don't have a causal relation: DC1 sees them in the order "e1, e3, e2", while DC2 sees them as "e1, e2, e3".
A third data center may also see the events as either "e1, e3, e2" or as "e1, e2, e3". A third replica may also see the events as either "e1, e3, e2" or as "e1, e2, e3".
### Concurrent updates ### Concurrent updates
@ -239,9 +240,9 @@ Each replica "owns" a slot in the version vector and increases its counter when
When comparing two version vectors `v1` and `v2`: When comparing two version vectors `v1` and `v2`:
* `v1` is SAME as `v2` iff for all i v1(i) == v2(i) * `v1` is SAME as `v2` iff for all i `v1(i) == v2(i)`
* `v1`is BEFORE `v2` iff for all i v1(i) <= v2(i) and there exist a j such that v1(j) < v2(j) * `v1`is BEFORE `v2` iff for all i `v1(i) <= v2(i)` and there exist a j such that `v1(j) < v2(j)`
* `v1`is AFTER `v2` iff for all i v1(i) >= v2(i) and there exist a j such that v1(j) > v2(j) * `v1`is AFTER `v2` iff for all i `v1(i) >= v2(i)` and there exist a j such that `v1(j) > v2(j)`
* `v1`is CONCURRENT with `v2` otherwise * `v1`is CONCURRENT with `v2` otherwise
@ -276,7 +277,7 @@ More advanced routing among the replicas is currently left as an exercise for th
Just like for regular `EventSourcedBehavior`s it is possible to tag events along with persisting them. Just like for regular `EventSourcedBehavior`s it is possible to tag events along with persisting them.
This is useful for later retrival of events for a given tag. The same @ref[API for tagging provided for EventSourcedBehavior](persistence.md#tagging) can This is useful for later retrival of events for a given tag. The same @ref[API for tagging provided for EventSourcedBehavior](persistence.md#tagging) can
be used for replicated event sourced behaviors as well. be used for replicated event sourced behaviors as well.
Tagging is useful in practice to build queries that lead to other data representations or aggregations of the these event Tagging is useful in practice to build queries that lead to other data representations or aggregations of these event
streams that can more directly serve user queries known as building the “read side” in @ref[CQRS](cqrs.md) based applications. streams that can more directly serve user queries known as building the “read side” in @ref[CQRS](cqrs.md) based applications.
Creating read side projections is possible through [Akka Projection](https://doc.akka.io/docs/akka-projection/current/) Creating read side projections is possible through [Akka Projection](https://doc.akka.io/docs/akka-projection/current/)
@ -325,10 +326,14 @@ to fast forward the stream of events for the origin replica. (With additional po
## Hot Standby ## Hot Standby
If all writes occur to one replica the other replicas are not started there might be many replicated events to catch up with when they are later started. Therefore it can be good to activate all replicas when there is some activity. If all writes occur to one replica and the other replicas are not started there might be many replicated events to catch up with when they are later started. Therefore it can be good to activate all replicas when there is some activity.
This can be achieved automatically when `ReplicatedSharding` is used and direct replication of events is enabled as described in @ref[Direct Replication of Events](#direct-replication-of-events). When each written event is forwarded to the other replicas it will trigger them to start if they are not already started. This can be achieved automatically when `ReplicatedSharding` is used and direct replication of events is enabled as described in @ref[Direct Replication of Events](#direct-replication-of-events). When each written event is forwarded to the other replicas it will trigger them to start if they are not already started.
## Examples
More examples can be found in @ref[Replicated Event Sourcing Examples](./replicated-eventsourcing-examples.md)
## Journal Support ## Journal Support
For a journal plugin to support replication it needs to store and read metadata for each event if it is defined in the @apiref[PersistentRepr] For a journal plugin to support replication it needs to store and read metadata for each event if it is defined in the @apiref[PersistentRepr]

View file

@ -36,9 +36,9 @@ object MultiJournalReplicationSpec {
entityId, entityId,
ReplicaId(replicaId), ReplicaId(replicaId),
Map(ReplicaId("R1") -> "journal1.query", ReplicaId("R2") -> "journal2.query"))( Map(ReplicaId("R1") -> "journal1.query", ReplicaId("R2") -> "journal2.query"))(
aaContext => replicationContext =>
EventSourcedBehavior[Command, String, Set[String]]( EventSourcedBehavior[Command, String, Set[String]](
aaContext.persistenceId, replicationContext.persistenceId,
Set.empty[String], Set.empty[String],
(state, command) => (state, command) =>
command match { command match {

View file

@ -33,9 +33,9 @@ object ReplicatedEventPublishingSpec {
replicaId, replicaId,
allReplicas, allReplicas,
PersistenceTestKitReadJournal.Identifier)( PersistenceTestKitReadJournal.Identifier)(
aactx => replicationctx =>
EventSourcedBehavior[Command, String, Set[String]]( EventSourcedBehavior[Command, String, Set[String]](
aactx.persistenceId, replicationctx.persistenceId,
Set.empty, Set.empty,
(state, command) => (state, command) =>
command match { command match {

View file

@ -36,10 +36,10 @@ object ReplicatedEventSourcingSpec {
testBehavior(entityId, replicaId, Some(probe)) testBehavior(entityId, replicaId, Some(probe))
def eventSourcedBehavior( def eventSourcedBehavior(
aaContext: ReplicationContext, replicationContext: ReplicationContext,
probe: Option[ActorRef[EventAndContext]]): EventSourcedBehavior[Command, String, State] = { probe: Option[ActorRef[EventAndContext]]): EventSourcedBehavior[Command, String, State] = {
EventSourcedBehavior[Command, String, State]( EventSourcedBehavior[Command, String, State](
aaContext.persistenceId, replicationContext.persistenceId,
State(Nil), State(Nil),
(state, command) => (state, command) =>
command match { command match {
@ -47,7 +47,7 @@ object ReplicatedEventSourcingSpec {
replyTo ! state replyTo ! state
Effect.none Effect.none
case GetReplica(replyTo) => case GetReplica(replyTo) =>
replyTo.tell((aaContext.replicaId, aaContext.allReplicas)) replyTo.tell((replicationContext.replicaId, replicationContext.allReplicas))
Effect.none Effect.none
case StoreMe(evt, ack) => case StoreMe(evt, ack) =>
Effect.persist(evt).thenRun(_ => ack ! Done) Effect.persist(evt).thenRun(_ => ack ! Done)
@ -57,7 +57,12 @@ object ReplicatedEventSourcingSpec {
Effect.stop() Effect.stop()
}, },
(state, event) => { (state, event) => {
probe.foreach(_ ! EventAndContext(event, aaContext.origin, aaContext.recoveryRunning, aaContext.concurrent)) probe.foreach(
_ ! EventAndContext(
event,
replicationContext.origin,
replicationContext.recoveryRunning,
replicationContext.concurrent))
state.copy(all = event :: state.all) state.copy(all = event :: state.all)
}) })
} }
@ -70,7 +75,7 @@ object ReplicatedEventSourcingSpec {
entityId, entityId,
ReplicaId(replicaId), ReplicaId(replicaId),
AllReplicas, AllReplicas,
PersistenceTestKitReadJournal.Identifier)(aaContext => eventSourcedBehavior(aaContext, probe)) PersistenceTestKitReadJournal.Identifier)(replicationContext => eventSourcedBehavior(replicationContext, probe))
} }

View file

@ -29,15 +29,15 @@ object ReplicationIllegalAccessSpec {
def apply(entityId: String, replica: ReplicaId): Behavior[Command] = { def apply(entityId: String, replica: ReplicaId): Behavior[Command] = {
ReplicatedEventSourcing.withSharedJournal(entityId, replica, AllReplicas, PersistenceTestKitReadJournal.Identifier)( ReplicatedEventSourcing.withSharedJournal(entityId, replica, AllReplicas, PersistenceTestKitReadJournal.Identifier)(
aaContext => replicationContext =>
EventSourcedBehavior[Command, String, State]( EventSourcedBehavior[Command, String, State](
aaContext.persistenceId, replicationContext.persistenceId,
State(Nil), State(Nil),
(_, command) => (_, command) =>
command match { command match {
case AccessInCommandHandler(replyTo) => case AccessInCommandHandler(replyTo) =>
val exception = try { val exception = try {
aaContext.origin replicationContext.origin
None None
} catch { } catch {
case t: Throwable => case t: Throwable =>
@ -48,7 +48,7 @@ object ReplicationIllegalAccessSpec {
case AccessInPersistCallback(replyTo) => case AccessInPersistCallback(replyTo) =>
Effect.persist("cat").thenRun { _ => Effect.persist("cat").thenRun { _ =>
val exception = try { val exception = try {
aaContext.concurrent replicationContext.concurrent
None None
} catch { } catch {
case t: Throwable => case t: Throwable =>
@ -86,8 +86,8 @@ class ReplicationIllegalAccessSpec
"detect illegal access in the factory" in { "detect illegal access in the factory" in {
val exception = intercept[UnsupportedOperationException] { val exception = intercept[UnsupportedOperationException] {
ReplicatedEventSourcing.withSharedJournal("id2", R1, AllReplicas, PersistenceTestKitReadJournal.Identifier) { ReplicatedEventSourcing.withSharedJournal("id2", R1, AllReplicas, PersistenceTestKitReadJournal.Identifier) {
aaContext => replicationContext =>
aaContext.origin replicationContext.origin
??? ???
} }
} }

View file

@ -37,8 +37,8 @@ object ReplicationSnapshotSpec {
entityId, entityId,
replicaId, replicaId,
AllReplicas, AllReplicas,
PersistenceTestKitReadJournal.Identifier)(aaContext => PersistenceTestKitReadJournal.Identifier)(replicationContext =>
eventSourcedBehavior(aaContext, probe).snapshotWhen((_, _, sequenceNr) => sequenceNr % 2 == 0)) eventSourcedBehavior(replicationContext, probe).snapshotWhen((_, _, sequenceNr) => sequenceNr % 2 == 0))
} }
} }

View file

@ -30,10 +30,10 @@ object LwwSpec {
entityId, entityId,
replica, replica,
AllReplicas, AllReplicas,
PersistenceTestKitReadJournal.Identifier) { aaContext => PersistenceTestKitReadJournal.Identifier) { replicationContext =>
EventSourcedBehavior[Command, Event, Registry]( EventSourcedBehavior[Command, Event, Registry](
aaContext.persistenceId, replicationContext.persistenceId,
Registry("", LwwTime(Long.MinValue, aaContext.replicaId)), Registry("", LwwTime(Long.MinValue, replicationContext.replicaId)),
(state, command) => (state, command) =>
command match { command match {
case Update(s, timestmap, error) => case Update(s, timestmap, error) =>
@ -41,7 +41,7 @@ object LwwSpec {
error ! "bad value" error ! "bad value"
Effect.none Effect.none
} else { } else {
Effect.persist(Changed(s, state.updatedTimestamp.increase(timestmap, aaContext.replicaId))) Effect.persist(Changed(s, state.updatedTimestamp.increase(timestmap, replicationContext.replicaId)))
} }
case Get(replyTo) => case Get(replyTo) =>
replyTo ! state replyTo ! state

View file

@ -31,9 +31,9 @@ object ORSetSpec {
entityId, entityId,
replica, replica,
AllReplicas, AllReplicas,
PersistenceTestKitReadJournal.Identifier) { aaContext => PersistenceTestKitReadJournal.Identifier) { replicationContext =>
EventSourcedBehavior[Command, ORSet.DeltaOp, ORSet[String]]( EventSourcedBehavior[Command, ORSet.DeltaOp, ORSet[String]](
aaContext.persistenceId, replicationContext.persistenceId,
ORSet(replica), ORSet(replica),
(state, command) => (state, command) =>
command match { command match {

View file

@ -104,7 +104,7 @@ object ReplicatedAuctionExampleSpec {
//#setup //#setup
//#command-handler //#command-handler
def commandHandler(setup: AuctionSetup, ctx: ActorContext[AuctionCommand], aaContext: ReplicationContext)( def commandHandler(setup: AuctionSetup, ctx: ActorContext[AuctionCommand], replicationContext: ReplicationContext)(
state: AuctionState, state: AuctionState,
command: AuctionCommand): Effect[AuctionEvent, AuctionState] = { command: AuctionCommand): Effect[AuctionEvent, AuctionState] = {
state.phase match { state.phase match {
@ -118,12 +118,12 @@ object ReplicatedAuctionExampleSpec {
Effect.none Effect.none
case Finish => case Finish =>
ctx.log.info("Finish") ctx.log.info("Finish")
Effect.persist(AuctionFinished(aaContext.replicaId)) Effect.persist(AuctionFinished(replicationContext.replicaId))
case Close => case Close =>
ctx.log.info("Close") ctx.log.info("Close")
require(shouldClose(setup, state)) require(shouldClose(setup, state))
// TODO send email (before or after persisting) // TODO send email (before or after persisting)
Effect.persist(WinnerDecided(aaContext.replicaId, state.highestBid, state.highestCounterOffer)) Effect.persist(WinnerDecided(replicationContext.replicaId, state.highestBid, state.highestCounterOffer))
case _: OfferBid => case _: OfferBid =>
// auction finished, no more bids accepted // auction finished, no more bids accepted
Effect.unhandled Effect.unhandled
@ -133,12 +133,16 @@ object ReplicatedAuctionExampleSpec {
case OfferBid(bidder, offer) => case OfferBid(bidder, offer) =>
Effect.persist( Effect.persist(
BidRegistered( BidRegistered(
Bid(bidder, offer, Instant.ofEpochMilli(aaContext.currentTimeMillis()), aaContext.replicaId))) Bid(
bidder,
offer,
Instant.ofEpochMilli(replicationContext.currentTimeMillis()),
replicationContext.replicaId)))
case GetHighestBid(replyTo) => case GetHighestBid(replyTo) =>
replyTo ! state.highestBid replyTo ! state.highestBid
Effect.none Effect.none
case Finish => case Finish =>
Effect.persist(AuctionFinished(aaContext.replicaId)) Effect.persist(AuctionFinished(replicationContext.replicaId))
case Close => case Close =>
ctx.log.warn("Premature close") ctx.log.warn("Premature close")
// Close should only be triggered when we have already finished // Close should only be triggered when we have already finished
@ -166,14 +170,14 @@ object ReplicatedAuctionExampleSpec {
} }
//#event-handler //#event-handler
def eventHandler(ctx: ActorContext[AuctionCommand], aaCtx: ReplicationContext, setup: AuctionSetup)( def eventHandler(ctx: ActorContext[AuctionCommand], replicationCtx: ReplicationContext, setup: AuctionSetup)(
state: AuctionState, state: AuctionState,
event: AuctionEvent): AuctionState = { event: AuctionEvent): AuctionState = {
val newState = state.applyEvent(event) val newState = state.applyEvent(event)
ctx.log.infoN("Applying event {}. New start {}", event, newState) ctx.log.infoN("Applying event {}. New start {}", event, newState)
if (!aaCtx.recoveryRunning) { if (!replicationCtx.recoveryRunning) {
eventTriggers(setup, ctx, aaCtx, event, newState) eventTriggers(setup, ctx, replicationCtx, event, newState)
} }
newState newState
@ -184,7 +188,7 @@ object ReplicatedAuctionExampleSpec {
private def eventTriggers( private def eventTriggers(
setup: AuctionSetup, setup: AuctionSetup,
ctx: ActorContext[AuctionCommand], ctx: ActorContext[AuctionCommand],
aaCtx: ReplicationContext, replicationCtx: ReplicationContext,
event: AuctionEvent, event: AuctionEvent,
newState: AuctionState) = { newState: AuctionState) = {
event match { event match {
@ -195,7 +199,7 @@ object ReplicatedAuctionExampleSpec {
"AuctionFinished at {}, already finished at [{}]", "AuctionFinished at {}, already finished at [{}]",
finished.atReplica, finished.atReplica,
alreadyFinishedAtDc.mkString(", ")) alreadyFinishedAtDc.mkString(", "))
if (alreadyFinishedAtDc(aaCtx.replicaId)) { if (alreadyFinishedAtDc(replicationCtx.replicaId)) {
if (shouldClose(setup, newState)) ctx.self ! Close if (shouldClose(setup, newState)) ctx.self ! Close
} else { } else {
ctx.log.info("Sending finish to self") ctx.log.info("Sending finish to self")
@ -215,12 +219,13 @@ object ReplicatedAuctionExampleSpec {
def behavior(replica: ReplicaId, setup: AuctionSetup): Behavior[AuctionCommand] = Behaviors.setup[AuctionCommand] { def behavior(replica: ReplicaId, setup: AuctionSetup): Behavior[AuctionCommand] = Behaviors.setup[AuctionCommand] {
ctx => ctx =>
ReplicatedEventSourcing ReplicatedEventSourcing
.withSharedJournal(setup.name, replica, setup.allReplicas, PersistenceTestKitReadJournal.Identifier) { aaCtx => .withSharedJournal(setup.name, replica, setup.allReplicas, PersistenceTestKitReadJournal.Identifier) {
replicationCtx =>
EventSourcedBehavior( EventSourcedBehavior(
aaCtx.persistenceId, replicationCtx.persistenceId,
initialState(setup), initialState(setup),
commandHandler(setup, ctx, aaCtx), commandHandler(setup, ctx, replicationCtx),
eventHandler(ctx, aaCtx, setup)) eventHandler(ctx, replicationCtx, setup))
} }
} }
} }

View file

@ -55,27 +55,27 @@ class ReplicatedBlogExampleSpec
implicit val config: PatienceConfig = PatienceConfig(timeout = Span(timeout.duration.toMillis, Millis)) implicit val config: PatienceConfig = PatienceConfig(timeout = Span(timeout.duration.toMillis, Millis))
def behavior(aa: ReplicationContext, ctx: ActorContext[BlogCommand]) = def behavior(replicationContext: ReplicationContext, ctx: ActorContext[BlogCommand]) =
EventSourcedBehavior[BlogCommand, BlogEvent, BlogState]( EventSourcedBehavior[BlogCommand, BlogEvent, BlogState](
aa.persistenceId, replicationContext.persistenceId,
emptyState, emptyState,
(state, cmd) => (state, cmd) =>
cmd match { cmd match {
case AddPost(_, content, replyTo) => case AddPost(_, content, replyTo) =>
val evt = val evt =
PostAdded( PostAdded(
aa.persistenceId.id, replicationContext.persistenceId.id,
content, content,
state.contentTimestamp.increase(aa.currentTimeMillis(), aa.replicaId)) state.contentTimestamp.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId))
Effect.persist(evt).thenRun { _ => Effect.persist(evt).thenRun { _ =>
replyTo ! AddPostDone(aa.entityId) replyTo ! AddPostDone(replicationContext.entityId)
} }
case ChangeBody(_, newContent, replyTo) => case ChangeBody(_, newContent, replyTo) =>
val evt = val evt =
BodyChanged( BodyChanged(
aa.persistenceId.id, replicationContext.persistenceId.id,
newContent, newContent,
state.contentTimestamp.increase(aa.currentTimeMillis(), aa.replicaId)) state.contentTimestamp.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId))
Effect.persist(evt).thenRun { _ => Effect.persist(evt).thenRun { _ =>
replyTo ! Done replyTo ! Done
} }
@ -89,7 +89,7 @@ class ReplicatedBlogExampleSpec
Effect.none Effect.none
}, },
(state, event) => { (state, event) => {
ctx.log.info(s"${aa.entityId}:${aa.replicaId} Received event $event") ctx.log.info(s"${replicationContext.entityId}:${replicationContext.replicaId} Received event $event")
event match { event match {
case PostAdded(_, content, timestamp) => case PostAdded(_, content, timestamp) =>
if (timestamp.isAfter(state.contentTimestamp)) { if (timestamp.isAfter(state.contentTimestamp)) {
@ -118,8 +118,8 @@ class ReplicatedBlogExampleSpec
"cat", "cat",
ReplicaId("DC-A"), ReplicaId("DC-A"),
Set(ReplicaId("DC-A"), ReplicaId("DC-B")), Set(ReplicaId("DC-A"), ReplicaId("DC-B")),
PersistenceTestKitReadJournal.Identifier) { (aa: ReplicationContext) => PersistenceTestKitReadJournal.Identifier) { replicationContext =>
behavior(aa, ctx) behavior(replicationContext, ctx)
} }
}, },
"dc-a") "dc-a")
@ -131,8 +131,8 @@ class ReplicatedBlogExampleSpec
"cat", "cat",
ReplicaId("DC-B"), ReplicaId("DC-B"),
Set(ReplicaId("DC-A"), ReplicaId("DC-B")), Set(ReplicaId("DC-A"), ReplicaId("DC-B")),
PersistenceTestKitReadJournal.Identifier) { (aa: ReplicationContext) => PersistenceTestKitReadJournal.Identifier) { replicationContext =>
behavior(aa, ctx) behavior(replicationContext, ctx)
} }
}, },
"dc-b") "dc-b")

View file

@ -123,7 +123,7 @@ private[akka] final class ReplayingEvents[C, E, S](
eventForErrorReporting = OptionVal.Some(event) eventForErrorReporting = OptionVal.Some(event)
state = state.copy(seqNr = repr.sequenceNr) state = state.copy(seqNr = repr.sequenceNr)
val aaMetaAndSelfReplica: Option[(ReplicatedEventMetadata, ReplicaId, ReplicationSetup)] = val replicatedMetaAndSelfReplica: Option[(ReplicatedEventMetadata, ReplicaId, ReplicationSetup)] =
setup.replication match { setup.replication match {
case Some(replication) => case Some(replication) =>
val meta = repr.metadata match { val meta = repr.metadata match {
@ -146,17 +146,17 @@ private[akka] final class ReplayingEvents[C, E, S](
case None => case None =>
} }
aaMetaAndSelfReplica match { replicatedMetaAndSelfReplica match {
case Some((meta, selfReplica, aa)) if meta.originReplica != selfReplica => case Some((meta, selfReplica, replication)) if meta.originReplica != selfReplica =>
// keep track of highest origin seqnr per other replica // keep track of highest origin seqnr per other replica
state = state.copy( state = state.copy(
state = newState, state = newState,
eventSeenInInterval = true, eventSeenInInterval = true,
version = meta.version, version = meta.version,
seenSeqNrPerReplica = state.seenSeqNrPerReplica + (meta.originReplica -> meta.originSequenceNr)) seenSeqNrPerReplica = state.seenSeqNrPerReplica + (meta.originReplica -> meta.originSequenceNr))
aa.clearContext() replication.clearContext()
case Some((_, _, aa)) => case Some((_, _, replication)) =>
aa.clearContext() replication.clearContext()
state = state.copy(state = newState, eventSeenInInterval = true) state = state.copy(state = newState, eventSeenInInterval = true)
case _ => case _ =>
state = state.copy(state = newState, eventSeenInInterval = true) state = state.copy(state = newState, eventSeenInInterval = true)

View file

@ -87,7 +87,7 @@ private[akka] final class ReplicationContextImpl(
private[akka] final case class ReplicationSetup( private[akka] final case class ReplicationSetup(
replicaId: ReplicaId, replicaId: ReplicaId,
allReplicasAndQueryPlugins: Map[ReplicaId, String], allReplicasAndQueryPlugins: Map[ReplicaId, String],
aaContext: ReplicationContextImpl) { replicationContext: ReplicationContextImpl) {
val allReplicas: Set[ReplicaId] = allReplicasAndQueryPlugins.keySet val allReplicas: Set[ReplicaId] = allReplicasAndQueryPlugins.keySet
@ -95,17 +95,17 @@ private[akka] final case class ReplicationSetup(
* Must only be called on the same thread that will execute the user code * Must only be called on the same thread that will execute the user code
*/ */
def setContext(recoveryRunning: Boolean, originReplica: ReplicaId, concurrent: Boolean): Unit = { def setContext(recoveryRunning: Boolean, originReplica: ReplicaId, concurrent: Boolean): Unit = {
aaContext._currentThread = OptionVal.Some(Thread.currentThread()) replicationContext._currentThread = OptionVal.Some(Thread.currentThread())
aaContext._recoveryRunning = recoveryRunning replicationContext._recoveryRunning = recoveryRunning
aaContext._concurrent = concurrent replicationContext._concurrent = concurrent
aaContext._origin = OptionVal.Some(originReplica) replicationContext._origin = OptionVal.Some(originReplica)
} }
def clearContext(): Unit = { def clearContext(): Unit = {
aaContext._currentThread = OptionVal.None replicationContext._currentThread = OptionVal.None
aaContext._recoveryRunning = false replicationContext._recoveryRunning = false
aaContext._concurrent = false replicationContext._concurrent = false
aaContext._origin = OptionVal.None replicationContext._origin = OptionVal.None
} }
} }

View file

@ -111,7 +111,7 @@ private[akka] object Running {
def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: RunningState[S]): Behavior[InternalProtocol] = { def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: RunningState[S]): Behavior[InternalProtocol] = {
val running = new Running(setup.setMdcPhase(PersistenceMdc.RunningCmds)) val running = new Running(setup.setMdcPhase(PersistenceMdc.RunningCmds))
val initialState = setup.replication match { val initialState = setup.replication match {
case Some(aa) => startReplicationStream(setup, state, aa) case Some(replication) => startReplicationStream(setup, state, replication)
case None => state case None => state
} }
new running.HandlingCommands(initialState) new running.HandlingCommands(initialState)
@ -128,7 +128,7 @@ private[akka] object Running {
val query = PersistenceQuery(system) val query = PersistenceQuery(system)
replicationSetup.allReplicas.foldLeft(state) { (state, replicaId) => replicationSetup.allReplicas.foldLeft(state) { (state, replicaId) =>
if (replicaId != replicationSetup.replicaId) { if (replicaId != replicationSetup.replicaId) {
val pid = PersistenceId.replicatedUniqueId(replicationSetup.aaContext.entityId, replicaId) val pid = PersistenceId.replicatedUniqueId(replicationSetup.replicationContext.entityId, replicaId)
val queryPluginId = replicationSetup.allReplicasAndQueryPlugins(replicaId) val queryPluginId = replicationSetup.allReplicasAndQueryPlugins(replicaId)
val replication = query.readJournalFor[EventsByPersistenceIdQuery](queryPluginId) val replication = query.readJournalFor[EventsByPersistenceIdQuery](queryPluginId)
@ -458,8 +458,8 @@ private[akka] object Running {
val eventAdapterManifest = setup.eventAdapter.manifest(event) val eventAdapterManifest = setup.eventAdapter.manifest(event)
val newState2 = setup.replication match { val newState2 = setup.replication match {
case Some(aa) => case Some(replication) =>
val updatedVersion = stateAfterApply.version.updated(aa.replicaId.id, _currentSequenceNumber) val updatedVersion = stateAfterApply.version.updated(replication.replicaId.id, _currentSequenceNumber)
val r = internalPersist( val r = internalPersist(
setup.context, setup.context,
cmd, cmd,
@ -467,8 +467,11 @@ private[akka] object Running {
eventToPersist, eventToPersist,
eventAdapterManifest, eventAdapterManifest,
OptionVal.Some( OptionVal.Some(
ReplicatedEventMetadata(aa.replicaId, _currentSequenceNumber, updatedVersion, concurrent = false))) ReplicatedEventMetadata(
.copy(version = updatedVersion) replication.replicaId,
_currentSequenceNumber,
updatedVersion,
concurrent = false))).copy(version = updatedVersion)
if (setup.log.isTraceEnabled()) if (setup.log.isTraceEnabled())
setup.log.traceN( setup.log.traceN(
@ -501,9 +504,9 @@ private[akka] object Running {
_currentSequenceNumber = state.seqNr _currentSequenceNumber = state.seqNr
val metadataTemplate: Option[ReplicatedEventMetadata] = setup.replication match { val metadataTemplate: Option[ReplicatedEventMetadata] = setup.replication match {
case Some(aa) => case Some(replication) =>
aa.setContext(recoveryRunning = false, aa.replicaId, concurrent = false) // local events are never concurrent replication.setContext(recoveryRunning = false, replication.replicaId, concurrent = false) // local events are never concurrent
Some(ReplicatedEventMetadata(aa.replicaId, 0L, state.version, concurrent = false)) // we replace it with actual seqnr later Some(ReplicatedEventMetadata(replication.replicaId, 0L, state.version, concurrent = false)) // we replace it with actual seqnr later
case None => None case None => None
} }
@ -685,7 +688,8 @@ private[akka] object Running {
onWriteSuccess(setup.context, p) onWriteSuccess(setup.context, p)
if (setup.publishEvents) { if (setup.publishEvents) {
val meta = setup.replication.map(aa => new ReplicatedPublishedEventMetaData(aa.replicaId, state.version)) val meta = setup.replication.map(replication =>
new ReplicatedPublishedEventMetaData(replication.replicaId, state.version))
context.system.eventStream ! EventStream.Publish( context.system.eventStream ! EventStream.Publish(
PublishedEventImpl(setup.persistenceId, p.sequenceNr, p.payload, p.timestamp, meta)) PublishedEventImpl(setup.persistenceId, p.sequenceNr, p.payload, p.timestamp, meta))
} }