From cd821fe3f8d428b2b36c74b82bedbe1ff2ba53a0 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 4 Aug 2020 08:12:45 +0200 Subject: [PATCH] Minor adjustments to replicated es docs (#29450) * Minor adjustments to replicated es docs * and a few aa leftovers * link to more examples --- .../typed/ReplicatedShardingTest.java | 17 +++-- .../typed/ReplicatedShardingSpec.scala | 18 ++--- .../main/paradox/typed/index-persistence.md | 4 +- .../paradox/typed/replicated-eventsourcing.md | 71 ++++++++++--------- .../typed/MultiJournalReplicationSpec.scala | 4 +- .../typed/ReplicatedEventPublishingSpec.scala | 4 +- .../typed/ReplicatedEventSourcingSpec.scala | 15 ++-- .../typed/ReplicationIllegalAccessSpec.scala | 12 ++-- .../typed/ReplicationSnapshotSpec.scala | 4 +- .../akka/persistence/typed/crdt/LwwSpec.scala | 8 +-- .../persistence/typed/crdt/ORSetSpec.scala | 4 +- .../typed/ReplicatedAuctionExampleSpec.scala | 37 +++++----- .../typed/ReplicatedBlogExampleSpec.scala | 24 +++---- .../typed/internal/ReplayingEvents.scala | 12 ++-- .../typed/internal/ReplicationSetup.scala | 18 ++--- .../persistence/typed/internal/Running.scala | 26 ++++--- 16 files changed, 151 insertions(+), 127 deletions(-) diff --git a/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java index d9cd4dcab9..eeab3cfb16 100644 --- a/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java +++ b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java @@ -29,6 +29,7 @@ import org.junit.Test; import org.scalatestplus.junit.JUnitSuite; import java.util.*; +import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; @@ -144,7 +145,7 @@ public class ReplicatedShardingTest extends JUnitSuite { private final ReplicatedSharding< MyReplicatedStringSet.Command, ShardingEnvelope> - aaSharding; + replicatedSharding; private ProxyActor(ActorContext context) { super(context); @@ -152,7 +153,7 @@ public class ReplicatedShardingTest extends JUnitSuite { // #bootstrap ReplicatedShardingSettings< MyReplicatedStringSet.Command, ShardingEnvelope> - aaShardingSettings = + replicatedShardingSettings = ReplicatedShardingSettings.create( MyReplicatedStringSet.Command.class, ALL_REPLICAS, @@ -176,8 +177,12 @@ public class ReplicatedShardingTest extends JUnitSuite { ReplicatedShardingExtension extension = ReplicatedShardingExtension.get(getContext().getSystem()); - aaSharding = extension.init(aaShardingSettings); + ReplicatedSharding< + MyReplicatedStringSet.Command, ShardingEnvelope> + replicatedSharding = extension.init(replicatedShardingSettings); // #bootstrap + + this.replicatedSharding = replicatedSharding; } @Override @@ -190,8 +195,8 @@ public class ReplicatedShardingTest extends JUnitSuite { private Behavior onForwardToRandom(ForwardToRandom forwardToRandom) { Map> refs = - aaSharding.getEntityRefsFor(forwardToRandom.entityId); - int chosenIdx = new java.util.Random().nextInt(refs.size()); + replicatedSharding.getEntityRefsFor(forwardToRandom.entityId); + int chosenIdx = ThreadLocalRandom.current().nextInt(refs.size()); new ArrayList<>(refs.values()).get(chosenIdx).tell(forwardToRandom.message); return this; } @@ -199,7 +204,7 @@ public class ReplicatedShardingTest extends JUnitSuite { private Behavior onForwardToAll(ForwardToAll forwardToAll) { // #all-entity-refs Map> refs = - aaSharding.getEntityRefsFor(forwardToAll.entityId); + replicatedSharding.getEntityRefsFor(forwardToAll.entityId); refs.forEach((replicaId, ref) -> ref.tell(forwardToAll.message)); // #all-entity-refs return this; diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala index d87a13673f..90d4d5fa22 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala @@ -4,6 +4,8 @@ package akka.cluster.sharding.typed +import java.util.concurrent.ThreadLocalRandom + import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorRef @@ -23,8 +25,6 @@ import akka.serialization.jackson.CborSerializable import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike -import scala.util.Random - object ReplicatedShardingSpec { def config = ConfigFactory.parseString(""" akka.loglevel = DEBUG @@ -52,9 +52,9 @@ class ReplicatedShardingSpec entityId, replicaId, allReplicas, - PersistenceTestKitReadJournal.Identifier) { aaContext => + PersistenceTestKitReadJournal.Identifier) { replicationContext => EventSourcedBehavior[Command, String, Set[String]]( - aaContext.persistenceId, + replicationContext.persistenceId, Set.empty[String], (state, command) => command match { @@ -75,7 +75,7 @@ class ReplicatedShardingSpec def apply(): Behavior[Command] = Behaviors.setup { context => // #bootstrap - val aaShardingSettings = + val replicatedShardingSettings = ReplicatedShardingSettings[MyReplicatedStringSet.Command, ShardingEnvelope[MyReplicatedStringSet.Command]]( // all replicas Set(ReplicaId("DC-A"), ReplicaId("DC-B"), ReplicaId("DC-C"))) { (entityTypeKey, replicaId, allReplicaIds) => @@ -93,20 +93,20 @@ class ReplicatedShardingSpec .withRole(replicaId.id)) } - val aaSharding = ReplicatedShardingExtension(context.system).init(aaShardingSettings) + val replicatedSharding = ReplicatedShardingExtension(context.system).init(replicatedShardingSettings) // #bootstrap Behaviors.receiveMessage { case ForwardToAll(entityId, cmd) => // #all-entity-refs - aaSharding.entityRefsFor(entityId).foreach { + replicatedSharding.entityRefsFor(entityId).foreach { case (_, ref) => ref ! cmd } // #all-entity-refs Behaviors.same case ForwardToRandom(entityId, cmd) => - val refs = aaSharding.entityRefsFor(entityId) - val chosenIdx = (new Random()).nextInt(refs.size) + val refs = replicatedSharding.entityRefsFor(entityId) + val chosenIdx = ThreadLocalRandom.current().nextInt(refs.size) refs.values.toIndexedSeq(chosenIdx) ! cmd; Behaviors.same } diff --git a/akka-docs/src/main/paradox/typed/index-persistence.md b/akka-docs/src/main/paradox/typed/index-persistence.md index 2a7fd09aa5..739a8d2349 100644 --- a/akka-docs/src/main/paradox/typed/index-persistence.md +++ b/akka-docs/src/main/paradox/typed/index-persistence.md @@ -9,7 +9,7 @@ project.description: Event Sourcing with Akka Persistence enables actors to pers @@@ index * [persistence](persistence.md) -* [active-active](replicated-eventsourcing.md) +* [replicated-eventsourcing](replicated-eventsourcing.md) * [cqrs](cqrs.md) * [persistence-style](persistence-style.md) * [persistence-snapshot](persistence-snapshot.md) @@ -20,6 +20,6 @@ project.description: Event Sourcing with Akka Persistence enables actors to pers * [persistence-query-leveldb](../persistence-query-leveldb.md) * [persistence-plugins](../persistence-plugins.md) * [persistence-journals](../persistence-journals.md) -* [active-active-examples](replicated-eventsourcing-examples.md) +* [replicated-eventsourcing-examples](replicated-eventsourcing-examples.md) @@@ diff --git a/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md b/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md index b0f555141b..66e8932173 100644 --- a/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md +++ b/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md @@ -10,8 +10,8 @@ warning or deprecation period. It is also not recommended to use this module in @ref[Event sourcing](./persistence.md) with `EventSourcedBehavior`s is based on the single writer principle, which means that there can only be one active instance of a `EventSourcedBehavior` with a given `persistenceId`. Otherwise, multiple instances would store interleaving events based on different states, and when these events would later be replayed it would not be possible to reconstruct the correct state. -This restriction means that in the event of network partitions, and for a short time during rolling re-deploys, `EventSourcedBehaviors`s are unavailable. - +This restriction means that in the event of network partitions, and for a short time during rolling re-deploys, some +`EventSourcedBehavior` actors are unavailable. Replicated Event Sourcing enables running multiple replicas of each entity. There is automatic replication of every event persisted to all replicas. @@ -31,15 +31,16 @@ However, the event handler must be able to **handle concurrent events** as when there is no longer the single writer principle as there is with a normal `EventSourcedBehavior`. The state of a replicated `EventSourcedBehavior` is **eventually consistent**. Event replication may be delayed -due to network partitions and outages and the event handler and those reading the state must be designed to handle this. +due to network partitions and outages, which means that the event handler and those reading the state must be designed +to handle this. -To be able to use Replicated Event Sourcing the journal and snapshot store used is required to have specific support for the metadata that the replication needs (see @ref[Journal Support](#journal-support)) +To be able to use Replicated Event Sourcing the journal and snapshot store used is required to have specific support for the metadata that the replication needs (see @ref[Journal Support](#journal-support)). ## Relaxing the single writer principle for availability Taking the example of using Replicated Event Sourcing to run a replica per data center. -When there is no network partitions and no concurrent writes the events stored by a `EventSourcedBehavior` at one replica can be replicated and consumed by another (corresponding) replica in another data center without any concerns. Such replicated events can simply be applied to the local state. +When there is no network partitions and no concurrent writes the events stored by an `EventSourcedBehavior` at one replica can be replicated and consumed by another (corresponding) replica in another data center without any concerns. Such replicated events can simply be applied to the local state. ![images/replicated-events1.png](images/replicated-events1.png) @@ -51,14 +52,14 @@ The event handler logic for applying events to the state of the entity must be a For example, sometimes it's enough to use application specific timestamps to decide which update should win. -To assist in implementing the event handler active-active detects these conflicts. +To assist in implementing the event handler the Replicated Event Sourcing detects these conflicts. ## API @scala[The same API as regular `EventSourcedBehavior`s]@java[A very similar API to the regular `EventSourcedBehavior`] is used to define the logic. -To enable an entity for active-active -replication @java[let it extend `ReplicatedEventSourcedBehavior` instead of `EventSourcedBehavior` and] use the factory methods on @scala[`akka.persistence.typed.scaladsl.ReplicatedEventSourcing`]@java[`akka.persistence.typed.javadsl.ReplicatedEventSourcing`]. +To enable an entity for Replicated Event Sourcing +@java[let it extend `ReplicatedEventSourcedBehavior` instead of `EventSourcedBehavior` and] use the factory methods on @scala[`akka.persistence.typed.scaladsl.ReplicatedEventSourcing`]@java[`akka.persistence.typed.javadsl.ReplicatedEventSourcing`]. All replicas need to be known up front: @@ -79,9 +80,9 @@ Java The factory takes in: -* EntityID: this will be used as part of the underlying persistenceId -* Replica: Which replica this instance is -* All Replicas and the query plugin used to read their events +* `entityId`: this will be used as part of the underlying persistenceId +* `replicaId`: Which replica this instance is +* `allReplicasAndQueryPlugins`: All Replicas and the query plugin used to read their events * A factory function to create an instance of the @scala[`EventSourcedBehavior`]@java[`ReplicatedEventSourcedBehavior`] In this scenario each replica reads from each other's database effectively providing cross region replication for any database that has an Akka Persistence plugin. Alternatively if all the replicas use the same journal, e.g. for testing or if it is a distributed database such as Cassandra, the `withSharedJournal` factory can be used. @@ -97,10 +98,10 @@ Java The function passed to both factory methods return an `EventSourcedBehavior` and provide access to the @apidoc[ReplicationContext] that has the following methods: -* entityId -* replicaId -* allReplicas -* persistenceId - to provide to the `EventSourcedBehavior` factory. This **must be used**. +* `entityId` +* `replicaId` +* `allReplicas` +* `persistenceId` - to provide to the `EventSourcedBehavior` factory. This **must be used**. As well as methods that **can only be** used in the event handler. The values these methods return relate to the event that is being processed. @@ -113,19 +114,19 @@ concrete `ReplicatedEventSourcedBehavior` and on to the super constructor. The context gives access to: -* entityId -* replicaId -* allReplicas -* persistenceId +* `entityId` +* `replicaId` +* `allReplicas` +* `persistenceId` As well as methods that **can only be** used in the event handler, accessed through `getReplicationContext`. The values these methods return relate to the event that is being processed. @@@ -* origin: The ReplicaId that originally created the event -* concurrent: Whether the event was concurrent with another event as in the second diagram above -* recoveryRunning: Whether a recovery is running. Can be used to send commands back to self for side effects that should only happen once. -* currentTimeMillis: similar to `System.currentTimeMillis` but guaranteed never to go backwards +* `origin`: The ReplicaId that originally created the event +* `concurrent`: Whether the event was concurrent with another event as in the second diagram above +* `recoveryRunning`: Whether a recovery is running. Can be used to send commands back to self for side effects that should only happen once. +* `currentTimeMillis`: similar to `System.currentTimeMillis` but guaranteed never to go backwards The factory returns a `Behavior` that can be spawned like any other behavior. @@ -146,11 +147,11 @@ Sometimes it is enough to use timestamps to decide which update should win. Such There is a small utility class @apidoc[LwwTime] that can be useful for implementing last writer wins semantics. It contains a timestamp representing current time when the event was persisted and an identifier of the replica that persisted it. When comparing two @apidoc[LwwTime] the greatest timestamp wins. The replica -identifier is used if the two timestamps are equal, and then the one from the data center sorted first in +identifier is used if the two timestamps are equal, and then the one from the `replicaId` sorted first in alphanumeric order wins. The nature of last writer wins means that if you only have one timestamp for the state the events must represent an -update of the full state. Otherwise, there is a risk that the state in different data centers will be different and +update of the full state. Otherwise, there is a risk that the state in different replicas will be different and not eventually converge. An example of that would be an entity representing a blog post and the fields `author` and `title` could be updated @@ -201,7 +202,7 @@ You don’t have to read this section to be able to use the feature, but to use ### Causal deliver order -Causal delivery order means that events persisted in one data center are read in the same order in other data centers. The order of concurrent events is undefined, which should be no problem +Causal delivery order means that events persisted in one replica are read in the same order in other replicas. The order of concurrent events is undefined, which should be no problem when using [CRDT's](#conflict-free-replicated-data-types) and otherwise will be detected via the `ReplicationContext` concurrent method. @@ -213,7 +214,7 @@ DC-2: read e1, write e2 DC-1: read e2, write e3 ``` -In the above example the causality is `e1 -> e2 -> e3`. Also in a third data center DC-3 these events will be read in the same order e1, e2, e3. +In the above example the causality is `e1 -> e2 -> e3`. Also in a third replica DC-3 these events will be read in the same order e1, e2, e3. Another example with concurrent events: @@ -227,7 +228,7 @@ DC2: read e3 e2 and e3 are concurrent, i.e. they don't have a causal relation: DC1 sees them in the order "e1, e3, e2", while DC2 sees them as "e1, e2, e3". -A third data center may also see the events as either "e1, e3, e2" or as "e1, e2, e3". +A third replica may also see the events as either "e1, e3, e2" or as "e1, e2, e3". ### Concurrent updates @@ -239,9 +240,9 @@ Each replica "owns" a slot in the version vector and increases its counter when When comparing two version vectors `v1` and `v2`: -* `v1` is SAME as `v2` iff for all i v1(i) == v2(i) -* `v1`is BEFORE `v2` iff for all i v1(i) <= v2(i) and there exist a j such that v1(j) < v2(j) -* `v1`is AFTER `v2` iff for all i v1(i) >= v2(i) and there exist a j such that v1(j) > v2(j) +* `v1` is SAME as `v2` iff for all i `v1(i) == v2(i)` +* `v1`is BEFORE `v2` iff for all i `v1(i) <= v2(i)` and there exist a j such that `v1(j) < v2(j)` +* `v1`is AFTER `v2` iff for all i `v1(i) >= v2(i)` and there exist a j such that `v1(j) > v2(j)` * `v1`is CONCURRENT with `v2` otherwise @@ -276,7 +277,7 @@ More advanced routing among the replicas is currently left as an exercise for th Just like for regular `EventSourcedBehavior`s it is possible to tag events along with persisting them. This is useful for later retrival of events for a given tag. The same @ref[API for tagging provided for EventSourcedBehavior](persistence.md#tagging) can be used for replicated event sourced behaviors as well. -Tagging is useful in practice to build queries that lead to other data representations or aggregations of the these event +Tagging is useful in practice to build queries that lead to other data representations or aggregations of these event streams that can more directly serve user queries – known as building the “read side” in @ref[CQRS](cqrs.md) based applications. Creating read side projections is possible through [Akka Projection](https://doc.akka.io/docs/akka-projection/current/) @@ -325,10 +326,14 @@ to fast forward the stream of events for the origin replica. (With additional po ## Hot Standby -If all writes occur to one replica the other replicas are not started there might be many replicated events to catch up with when they are later started. Therefore it can be good to activate all replicas when there is some activity. +If all writes occur to one replica and the other replicas are not started there might be many replicated events to catch up with when they are later started. Therefore it can be good to activate all replicas when there is some activity. This can be achieved automatically when `ReplicatedSharding` is used and direct replication of events is enabled as described in @ref[Direct Replication of Events](#direct-replication-of-events). When each written event is forwarded to the other replicas it will trigger them to start if they are not already started. +## Examples + +More examples can be found in @ref[Replicated Event Sourcing Examples](./replicated-eventsourcing-examples.md) + ## Journal Support For a journal plugin to support replication it needs to store and read metadata for each event if it is defined in the @apiref[PersistentRepr] diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/MultiJournalReplicationSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/MultiJournalReplicationSpec.scala index 8de42eacaf..d155d9d6dc 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/MultiJournalReplicationSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/MultiJournalReplicationSpec.scala @@ -36,9 +36,9 @@ object MultiJournalReplicationSpec { entityId, ReplicaId(replicaId), Map(ReplicaId("R1") -> "journal1.query", ReplicaId("R2") -> "journal2.query"))( - aaContext => + replicationContext => EventSourcedBehavior[Command, String, Set[String]]( - aaContext.persistenceId, + replicationContext.persistenceId, Set.empty[String], (state, command) => command match { diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala index 90c498efb8..c8a88b947c 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala @@ -33,9 +33,9 @@ object ReplicatedEventPublishingSpec { replicaId, allReplicas, PersistenceTestKitReadJournal.Identifier)( - aactx => + replicationctx => EventSourcedBehavior[Command, String, Set[String]]( - aactx.persistenceId, + replicationctx.persistenceId, Set.empty, (state, command) => command match { diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala index 4a4c19766b..beeb5c0863 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala @@ -36,10 +36,10 @@ object ReplicatedEventSourcingSpec { testBehavior(entityId, replicaId, Some(probe)) def eventSourcedBehavior( - aaContext: ReplicationContext, + replicationContext: ReplicationContext, probe: Option[ActorRef[EventAndContext]]): EventSourcedBehavior[Command, String, State] = { EventSourcedBehavior[Command, String, State]( - aaContext.persistenceId, + replicationContext.persistenceId, State(Nil), (state, command) => command match { @@ -47,7 +47,7 @@ object ReplicatedEventSourcingSpec { replyTo ! state Effect.none case GetReplica(replyTo) => - replyTo.tell((aaContext.replicaId, aaContext.allReplicas)) + replyTo.tell((replicationContext.replicaId, replicationContext.allReplicas)) Effect.none case StoreMe(evt, ack) => Effect.persist(evt).thenRun(_ => ack ! Done) @@ -57,7 +57,12 @@ object ReplicatedEventSourcingSpec { Effect.stop() }, (state, event) => { - probe.foreach(_ ! EventAndContext(event, aaContext.origin, aaContext.recoveryRunning, aaContext.concurrent)) + probe.foreach( + _ ! EventAndContext( + event, + replicationContext.origin, + replicationContext.recoveryRunning, + replicationContext.concurrent)) state.copy(all = event :: state.all) }) } @@ -70,7 +75,7 @@ object ReplicatedEventSourcingSpec { entityId, ReplicaId(replicaId), AllReplicas, - PersistenceTestKitReadJournal.Identifier)(aaContext => eventSourcedBehavior(aaContext, probe)) + PersistenceTestKitReadJournal.Identifier)(replicationContext => eventSourcedBehavior(replicationContext, probe)) } diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationIllegalAccessSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationIllegalAccessSpec.scala index 8415b0dcd9..11b28e9f76 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationIllegalAccessSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationIllegalAccessSpec.scala @@ -29,15 +29,15 @@ object ReplicationIllegalAccessSpec { def apply(entityId: String, replica: ReplicaId): Behavior[Command] = { ReplicatedEventSourcing.withSharedJournal(entityId, replica, AllReplicas, PersistenceTestKitReadJournal.Identifier)( - aaContext => + replicationContext => EventSourcedBehavior[Command, String, State]( - aaContext.persistenceId, + replicationContext.persistenceId, State(Nil), (_, command) => command match { case AccessInCommandHandler(replyTo) => val exception = try { - aaContext.origin + replicationContext.origin None } catch { case t: Throwable => @@ -48,7 +48,7 @@ object ReplicationIllegalAccessSpec { case AccessInPersistCallback(replyTo) => Effect.persist("cat").thenRun { _ => val exception = try { - aaContext.concurrent + replicationContext.concurrent None } catch { case t: Throwable => @@ -86,8 +86,8 @@ class ReplicationIllegalAccessSpec "detect illegal access in the factory" in { val exception = intercept[UnsupportedOperationException] { ReplicatedEventSourcing.withSharedJournal("id2", R1, AllReplicas, PersistenceTestKitReadJournal.Identifier) { - aaContext => - aaContext.origin + replicationContext => + replicationContext.origin ??? } } diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationSnapshotSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationSnapshotSpec.scala index f71b12afbb..d20638c0d1 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationSnapshotSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationSnapshotSpec.scala @@ -37,8 +37,8 @@ object ReplicationSnapshotSpec { entityId, replicaId, AllReplicas, - PersistenceTestKitReadJournal.Identifier)(aaContext => - eventSourcedBehavior(aaContext, probe).snapshotWhen((_, _, sequenceNr) => sequenceNr % 2 == 0)) + PersistenceTestKitReadJournal.Identifier)(replicationContext => + eventSourcedBehavior(replicationContext, probe).snapshotWhen((_, _, sequenceNr) => sequenceNr % 2 == 0)) } } diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/LwwSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/LwwSpec.scala index 48b300b5cf..f8bf4e4240 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/LwwSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/LwwSpec.scala @@ -30,10 +30,10 @@ object LwwSpec { entityId, replica, AllReplicas, - PersistenceTestKitReadJournal.Identifier) { aaContext => + PersistenceTestKitReadJournal.Identifier) { replicationContext => EventSourcedBehavior[Command, Event, Registry]( - aaContext.persistenceId, - Registry("", LwwTime(Long.MinValue, aaContext.replicaId)), + replicationContext.persistenceId, + Registry("", LwwTime(Long.MinValue, replicationContext.replicaId)), (state, command) => command match { case Update(s, timestmap, error) => @@ -41,7 +41,7 @@ object LwwSpec { error ! "bad value" Effect.none } else { - Effect.persist(Changed(s, state.updatedTimestamp.increase(timestmap, aaContext.replicaId))) + Effect.persist(Changed(s, state.updatedTimestamp.increase(timestmap, replicationContext.replicaId))) } case Get(replyTo) => replyTo ! state diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/ORSetSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/ORSetSpec.scala index e17afe3090..2e02fd65bb 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/ORSetSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/ORSetSpec.scala @@ -31,9 +31,9 @@ object ORSetSpec { entityId, replica, AllReplicas, - PersistenceTestKitReadJournal.Identifier) { aaContext => + PersistenceTestKitReadJournal.Identifier) { replicationContext => EventSourcedBehavior[Command, ORSet.DeltaOp, ORSet[String]]( - aaContext.persistenceId, + replicationContext.persistenceId, ORSet(replica), (state, command) => command match { diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala index f5aaa3f9b2..d0fd54f0f6 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala @@ -104,7 +104,7 @@ object ReplicatedAuctionExampleSpec { //#setup //#command-handler - def commandHandler(setup: AuctionSetup, ctx: ActorContext[AuctionCommand], aaContext: ReplicationContext)( + def commandHandler(setup: AuctionSetup, ctx: ActorContext[AuctionCommand], replicationContext: ReplicationContext)( state: AuctionState, command: AuctionCommand): Effect[AuctionEvent, AuctionState] = { state.phase match { @@ -118,12 +118,12 @@ object ReplicatedAuctionExampleSpec { Effect.none case Finish => ctx.log.info("Finish") - Effect.persist(AuctionFinished(aaContext.replicaId)) + Effect.persist(AuctionFinished(replicationContext.replicaId)) case Close => ctx.log.info("Close") require(shouldClose(setup, state)) // TODO send email (before or after persisting) - Effect.persist(WinnerDecided(aaContext.replicaId, state.highestBid, state.highestCounterOffer)) + Effect.persist(WinnerDecided(replicationContext.replicaId, state.highestBid, state.highestCounterOffer)) case _: OfferBid => // auction finished, no more bids accepted Effect.unhandled @@ -133,12 +133,16 @@ object ReplicatedAuctionExampleSpec { case OfferBid(bidder, offer) => Effect.persist( BidRegistered( - Bid(bidder, offer, Instant.ofEpochMilli(aaContext.currentTimeMillis()), aaContext.replicaId))) + Bid( + bidder, + offer, + Instant.ofEpochMilli(replicationContext.currentTimeMillis()), + replicationContext.replicaId))) case GetHighestBid(replyTo) => replyTo ! state.highestBid Effect.none case Finish => - Effect.persist(AuctionFinished(aaContext.replicaId)) + Effect.persist(AuctionFinished(replicationContext.replicaId)) case Close => ctx.log.warn("Premature close") // Close should only be triggered when we have already finished @@ -166,14 +170,14 @@ object ReplicatedAuctionExampleSpec { } //#event-handler - def eventHandler(ctx: ActorContext[AuctionCommand], aaCtx: ReplicationContext, setup: AuctionSetup)( + def eventHandler(ctx: ActorContext[AuctionCommand], replicationCtx: ReplicationContext, setup: AuctionSetup)( state: AuctionState, event: AuctionEvent): AuctionState = { val newState = state.applyEvent(event) ctx.log.infoN("Applying event {}. New start {}", event, newState) - if (!aaCtx.recoveryRunning) { - eventTriggers(setup, ctx, aaCtx, event, newState) + if (!replicationCtx.recoveryRunning) { + eventTriggers(setup, ctx, replicationCtx, event, newState) } newState @@ -184,7 +188,7 @@ object ReplicatedAuctionExampleSpec { private def eventTriggers( setup: AuctionSetup, ctx: ActorContext[AuctionCommand], - aaCtx: ReplicationContext, + replicationCtx: ReplicationContext, event: AuctionEvent, newState: AuctionState) = { event match { @@ -195,7 +199,7 @@ object ReplicatedAuctionExampleSpec { "AuctionFinished at {}, already finished at [{}]", finished.atReplica, alreadyFinishedAtDc.mkString(", ")) - if (alreadyFinishedAtDc(aaCtx.replicaId)) { + if (alreadyFinishedAtDc(replicationCtx.replicaId)) { if (shouldClose(setup, newState)) ctx.self ! Close } else { ctx.log.info("Sending finish to self") @@ -215,12 +219,13 @@ object ReplicatedAuctionExampleSpec { def behavior(replica: ReplicaId, setup: AuctionSetup): Behavior[AuctionCommand] = Behaviors.setup[AuctionCommand] { ctx => ReplicatedEventSourcing - .withSharedJournal(setup.name, replica, setup.allReplicas, PersistenceTestKitReadJournal.Identifier) { aaCtx => - EventSourcedBehavior( - aaCtx.persistenceId, - initialState(setup), - commandHandler(setup, ctx, aaCtx), - eventHandler(ctx, aaCtx, setup)) + .withSharedJournal(setup.name, replica, setup.allReplicas, PersistenceTestKitReadJournal.Identifier) { + replicationCtx => + EventSourcedBehavior( + replicationCtx.persistenceId, + initialState(setup), + commandHandler(setup, ctx, replicationCtx), + eventHandler(ctx, replicationCtx, setup)) } } } diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala index 2450592182..e4d7f3488f 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala @@ -55,27 +55,27 @@ class ReplicatedBlogExampleSpec implicit val config: PatienceConfig = PatienceConfig(timeout = Span(timeout.duration.toMillis, Millis)) - def behavior(aa: ReplicationContext, ctx: ActorContext[BlogCommand]) = + def behavior(replicationContext: ReplicationContext, ctx: ActorContext[BlogCommand]) = EventSourcedBehavior[BlogCommand, BlogEvent, BlogState]( - aa.persistenceId, + replicationContext.persistenceId, emptyState, (state, cmd) => cmd match { case AddPost(_, content, replyTo) => val evt = PostAdded( - aa.persistenceId.id, + replicationContext.persistenceId.id, content, - state.contentTimestamp.increase(aa.currentTimeMillis(), aa.replicaId)) + state.contentTimestamp.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId)) Effect.persist(evt).thenRun { _ => - replyTo ! AddPostDone(aa.entityId) + replyTo ! AddPostDone(replicationContext.entityId) } case ChangeBody(_, newContent, replyTo) => val evt = BodyChanged( - aa.persistenceId.id, + replicationContext.persistenceId.id, newContent, - state.contentTimestamp.increase(aa.currentTimeMillis(), aa.replicaId)) + state.contentTimestamp.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId)) Effect.persist(evt).thenRun { _ => replyTo ! Done } @@ -89,7 +89,7 @@ class ReplicatedBlogExampleSpec Effect.none }, (state, event) => { - ctx.log.info(s"${aa.entityId}:${aa.replicaId} Received event $event") + ctx.log.info(s"${replicationContext.entityId}:${replicationContext.replicaId} Received event $event") event match { case PostAdded(_, content, timestamp) => if (timestamp.isAfter(state.contentTimestamp)) { @@ -118,8 +118,8 @@ class ReplicatedBlogExampleSpec "cat", ReplicaId("DC-A"), Set(ReplicaId("DC-A"), ReplicaId("DC-B")), - PersistenceTestKitReadJournal.Identifier) { (aa: ReplicationContext) => - behavior(aa, ctx) + PersistenceTestKitReadJournal.Identifier) { replicationContext => + behavior(replicationContext, ctx) } }, "dc-a") @@ -131,8 +131,8 @@ class ReplicatedBlogExampleSpec "cat", ReplicaId("DC-B"), Set(ReplicaId("DC-A"), ReplicaId("DC-B")), - PersistenceTestKitReadJournal.Identifier) { (aa: ReplicationContext) => - behavior(aa, ctx) + PersistenceTestKitReadJournal.Identifier) { replicationContext => + behavior(replicationContext, ctx) } }, "dc-b") diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index 1bac5c2683..a9d62c7ba6 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -123,7 +123,7 @@ private[akka] final class ReplayingEvents[C, E, S]( eventForErrorReporting = OptionVal.Some(event) state = state.copy(seqNr = repr.sequenceNr) - val aaMetaAndSelfReplica: Option[(ReplicatedEventMetadata, ReplicaId, ReplicationSetup)] = + val replicatedMetaAndSelfReplica: Option[(ReplicatedEventMetadata, ReplicaId, ReplicationSetup)] = setup.replication match { case Some(replication) => val meta = repr.metadata match { @@ -146,17 +146,17 @@ private[akka] final class ReplayingEvents[C, E, S]( case None => } - aaMetaAndSelfReplica match { - case Some((meta, selfReplica, aa)) if meta.originReplica != selfReplica => + replicatedMetaAndSelfReplica match { + case Some((meta, selfReplica, replication)) if meta.originReplica != selfReplica => // keep track of highest origin seqnr per other replica state = state.copy( state = newState, eventSeenInInterval = true, version = meta.version, seenSeqNrPerReplica = state.seenSeqNrPerReplica + (meta.originReplica -> meta.originSequenceNr)) - aa.clearContext() - case Some((_, _, aa)) => - aa.clearContext() + replication.clearContext() + case Some((_, _, replication)) => + replication.clearContext() state = state.copy(state = newState, eventSeenInInterval = true) case _ => state = state.copy(state = newState, eventSeenInInterval = true) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplicationSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplicationSetup.scala index 755201eb63..62b1e49851 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplicationSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplicationSetup.scala @@ -87,7 +87,7 @@ private[akka] final class ReplicationContextImpl( private[akka] final case class ReplicationSetup( replicaId: ReplicaId, allReplicasAndQueryPlugins: Map[ReplicaId, String], - aaContext: ReplicationContextImpl) { + replicationContext: ReplicationContextImpl) { val allReplicas: Set[ReplicaId] = allReplicasAndQueryPlugins.keySet @@ -95,17 +95,17 @@ private[akka] final case class ReplicationSetup( * Must only be called on the same thread that will execute the user code */ def setContext(recoveryRunning: Boolean, originReplica: ReplicaId, concurrent: Boolean): Unit = { - aaContext._currentThread = OptionVal.Some(Thread.currentThread()) - aaContext._recoveryRunning = recoveryRunning - aaContext._concurrent = concurrent - aaContext._origin = OptionVal.Some(originReplica) + replicationContext._currentThread = OptionVal.Some(Thread.currentThread()) + replicationContext._recoveryRunning = recoveryRunning + replicationContext._concurrent = concurrent + replicationContext._origin = OptionVal.Some(originReplica) } def clearContext(): Unit = { - aaContext._currentThread = OptionVal.None - aaContext._recoveryRunning = false - aaContext._concurrent = false - aaContext._origin = OptionVal.None + replicationContext._currentThread = OptionVal.None + replicationContext._recoveryRunning = false + replicationContext._concurrent = false + replicationContext._origin = OptionVal.None } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index e0fb66ffe8..05889a0101 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -111,8 +111,8 @@ private[akka] object Running { def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: RunningState[S]): Behavior[InternalProtocol] = { val running = new Running(setup.setMdcPhase(PersistenceMdc.RunningCmds)) val initialState = setup.replication match { - case Some(aa) => startReplicationStream(setup, state, aa) - case None => state + case Some(replication) => startReplicationStream(setup, state, replication) + case None => state } new running.HandlingCommands(initialState) } @@ -128,7 +128,7 @@ private[akka] object Running { val query = PersistenceQuery(system) replicationSetup.allReplicas.foldLeft(state) { (state, replicaId) => if (replicaId != replicationSetup.replicaId) { - val pid = PersistenceId.replicatedUniqueId(replicationSetup.aaContext.entityId, replicaId) + val pid = PersistenceId.replicatedUniqueId(replicationSetup.replicationContext.entityId, replicaId) val queryPluginId = replicationSetup.allReplicasAndQueryPlugins(replicaId) val replication = query.readJournalFor[EventsByPersistenceIdQuery](queryPluginId) @@ -458,8 +458,8 @@ private[akka] object Running { val eventAdapterManifest = setup.eventAdapter.manifest(event) val newState2 = setup.replication match { - case Some(aa) => - val updatedVersion = stateAfterApply.version.updated(aa.replicaId.id, _currentSequenceNumber) + case Some(replication) => + val updatedVersion = stateAfterApply.version.updated(replication.replicaId.id, _currentSequenceNumber) val r = internalPersist( setup.context, cmd, @@ -467,8 +467,11 @@ private[akka] object Running { eventToPersist, eventAdapterManifest, OptionVal.Some( - ReplicatedEventMetadata(aa.replicaId, _currentSequenceNumber, updatedVersion, concurrent = false))) - .copy(version = updatedVersion) + ReplicatedEventMetadata( + replication.replicaId, + _currentSequenceNumber, + updatedVersion, + concurrent = false))).copy(version = updatedVersion) if (setup.log.isTraceEnabled()) setup.log.traceN( @@ -501,9 +504,9 @@ private[akka] object Running { _currentSequenceNumber = state.seqNr val metadataTemplate: Option[ReplicatedEventMetadata] = setup.replication match { - case Some(aa) => - aa.setContext(recoveryRunning = false, aa.replicaId, concurrent = false) // local events are never concurrent - Some(ReplicatedEventMetadata(aa.replicaId, 0L, state.version, concurrent = false)) // we replace it with actual seqnr later + case Some(replication) => + replication.setContext(recoveryRunning = false, replication.replicaId, concurrent = false) // local events are never concurrent + Some(ReplicatedEventMetadata(replication.replicaId, 0L, state.version, concurrent = false)) // we replace it with actual seqnr later case None => None } @@ -685,7 +688,8 @@ private[akka] object Running { onWriteSuccess(setup.context, p) if (setup.publishEvents) { - val meta = setup.replication.map(aa => new ReplicatedPublishedEventMetaData(aa.replicaId, state.version)) + val meta = setup.replication.map(replication => + new ReplicatedPublishedEventMetaData(replication.replicaId, state.version)) context.system.eventStream ! EventStream.Publish( PublishedEventImpl(setup.persistenceId, p.sequenceNr, p.payload, p.timestamp, meta)) }