From 21713bbc30d90abc5b197e4d621312d8cc2e494e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Mon, 20 Jul 2020 10:32:36 +0200 Subject: [PATCH] Active active java apis #29266 --- .../typed/persistence-active-active.md | 45 +++- .../persistence/typed/ActiveActiveTest.java | 208 ++++++++++++++++++ .../typed/ActiveActiveCompileOnlyTest.java | 84 +++++++ .../persistence/typed/AABlogExampleSpec.scala | 9 +- .../typed/ActiveActiveCompileOnlySpec.scala | 16 +- ...9217-active-active-event-sourcing.excludes | 8 +- .../akka/persistence/typed/LwwTime.scala | 33 +++ .../internal/EventSourcedBehaviorImpl.scala | 6 +- .../ActiveActiveEventSourcedBehavior.scala | 67 ++++++ .../javadsl/ActiveActiveEventSourcing.scala | 96 ++++++++ .../typed/javadsl/EventSourcedBehavior.scala | 3 +- .../scaladsl/ActiveActiveEventSourcing.scala | 15 +- .../typed/scaladsl/EventSourcedBehavior.scala | 5 +- 13 files changed, 563 insertions(+), 32 deletions(-) create mode 100644 akka-persistence-typed-tests/src/test/java/akka/persistence/typed/ActiveActiveTest.java create mode 100644 akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ActiveActiveCompileOnlyTest.java create mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/LwwTime.scala create mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ActiveActiveEventSourcedBehavior.scala create mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ActiveActiveEventSourcing.scala diff --git a/akka-docs/src/main/paradox/typed/persistence-active-active.md b/akka-docs/src/main/paradox/typed/persistence-active-active.md index bfa8dce78a..4b3573f003 100644 --- a/akka-docs/src/main/paradox/typed/persistence-active-active.md +++ b/akka-docs/src/main/paradox/typed/persistence-active-active.md @@ -53,32 +53,47 @@ To assist in implementing the event handler active-active detects these conflict ## API -The same API as regular `EventSourcedBehavior`s is used to define the logic. To enable an entity for active-active -replication use the factory methods on @api[ActiveActiveEventSourcing]. +@scala[The same API as regular `EventSourcedBehavior`s]@java[A very similar API to the regular `EventSourcedBehavior`] is used to define the logic. + +To enable an entity for active-active +replication @java[let it extend `ActiveActiveEventSourcedBehavior` instead of `EventSourcedBehavior` and] use the factory methods on @apidoc[ActiveActiveEventSourcing]. All replicas need to be known up front: Scala : @@snip [ActiveActiveCompileOnlySpec.scala](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ActiveActiveCompileOnlySpec.scala) { #replicas } +Java +: @@snip [ActiveActiveCompileOnlyTest.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ActiveActiveCompileOnlyTest.java) { #replicas } + + Then to enable replication create the event sourced behavior with the factory method: Scala : @@snip [ActiveActiveCompileOnlySpec.scala](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ActiveActiveCompileOnlySpec.scala) { #factory } +Java +: @@snip [ActiveActiveCompileOnlyTest.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ActiveActiveCompileOnlyTest.java) { #factory } + The factory takes in: * EntityID: this will be used as part of the underlying persistenceId * Replica: Which replica this instance is -* All Replicas and the query plugin used to read their events +* All Replicas and the query plugin used to read their events +* A factory function to create an instance of the @scala[`EventSourcedBehavior`]@java[`ActiveActiveEventSourcedBehavior`] In this scenario each replica reads from each other's database effectively providing cross region replication for any database that has an Akka Persistence plugin. Alternatively if all the replicas use the same journal, e.g. for testing or if it is a distributed database such as Cassandra, the `withSharedJournal` factory can be used. Scala : @@snip [ActiveActiveCompileOnlySpec.scala](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ActiveActiveCompileOnlySpec.scala) { #factory-shared} +Java +: @@snip [ActiveActiveCompileOnlyTest.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ActiveActiveCompileOnlyTest.java) { #factory-shared } -The function passed to both factory methods return an `EventSourcedBehavior` and provide access to the @api[ActiveActiveContext] that has the following methods: + +@@@ div { .group-scala } + +The function passed to both factory methods return an `EventSourcedBehavior` and provide access to the @apidoc[ActiveActiveContext] that has the following methods: * entityId * replicaId @@ -87,6 +102,24 @@ The function passed to both factory methods return an `EventSourcedBehavior` and As well as methods that **can only be** used in the event handler. The values these methods return relate to the event that is being processed. +@@@ + +@@@ div { .group-java } + +The function passed to both factory methods is invoked with a special @apidoc[ActiveActiveContext] that needs to be passed to the +concrete `ActiveActiveEventSourcedBehavior` and on to the super constructor. + +The context gives access to: + +* entityId +* replicaId +* allReplicas +* persistenceId + +As well as methods that **can only be** used in the event handler, accessed through `getActiveActiveContext`. The values these methods return relate to the event that is being processed. + +@@@ + * origin: The ReplicaId that originally created the event * concurrent: Whether the event was concurrent with another event as in the second diagram above * recoveryRunning: Whether a recovery is running. Can be used to send commands back to self for side effects that should only happen once. @@ -108,9 +141,9 @@ Sometimes it is enough to use timestamps to decide which update should win. Such ![images/lww.png](images/lww.png) -There is a small utility class @api[LwwTime] that can be useful for implementing last writer wins semantics. +There is a small utility class @apidoc[akka.persistence.typed.LwwTime] that can be useful for implementing last writer wins semantics. It contains a timestamp representing current time when the event was persisted and an identifier of the -replica that persisted it. When comparing two @api[LwwTime] the greatest timestamp wins. The replica +replica that persisted it. When comparing two @apidoc[akka.persistence.typed.LwwTime] the greatest timestamp wins. The replica identifier is used if the two timestamps are equal, and then the one from the data center sorted first in alphanumeric order wins. diff --git a/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/ActiveActiveTest.java b/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/ActiveActiveTest.java new file mode 100644 index 0000000000..38299e1530 --- /dev/null +++ b/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/ActiveActiveTest.java @@ -0,0 +1,208 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.typed; + +import akka.Done; +import akka.actor.testkit.typed.javadsl.LogCapturing; +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.testkit.typed.javadsl.TestProbe; +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.persistence.testkit.PersistenceTestKitPlugin; +import akka.persistence.testkit.javadsl.PersistenceTestKit; +import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal; +import akka.persistence.typed.javadsl.*; +import com.typesafe.config.ConfigFactory; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.scalatestplus.junit.JUnitSuite; + +import java.util.*; + +import static akka.Done.done; +import static org.junit.Assert.assertEquals; + +public class ActiveActiveTest extends JUnitSuite { + + static final class TestBehavior + extends ActiveActiveEventSourcedBehavior> { + interface Command {} + + static final class GetState implements Command { + final ActorRef replyTo; + + public GetState(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + + static final class StoreMe implements Command { + final String text; + final ActorRef replyTo; + + public StoreMe(String text, ActorRef replyTo) { + this.text = text; + this.replyTo = replyTo; + } + } + + static final class StoreUs implements Command { + final List texts; + final ActorRef replyTo; + + public StoreUs(List texts, ActorRef replyTo) { + this.texts = texts; + this.replyTo = replyTo; + } + } + + static final class GetReplica implements Command { + final ActorRef replyTo; + + public GetReplica(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + + static final class State { + final Set texts; + + public State(Set texts) { + this.texts = texts; + } + } + + enum Stop implements Command { + INSTANCE + } + + public static Behavior create( + String entityId, ReplicaId replicaId, Set allReplicas) { + return ActiveActiveEventSourcing.withSharedJournal( + entityId, + replicaId, + allReplicas, + PersistenceTestKitReadJournal.Identifier(), + TestBehavior::new); + } + + private TestBehavior(ActiveActiveContext activeActiveContext) { + super(activeActiveContext); + } + + @Override + public String journalPluginId() { + return PersistenceTestKitPlugin.PluginId(); + } + + @Override + public Set emptyState() { + return Collections.emptySet(); + } + + @Override + public CommandHandler> commandHandler() { + return newCommandHandlerBuilder() + .forAnyState() + .onCommand( + StoreMe.class, + (StoreMe cmd) -> Effect().persist(cmd.text).thenRun(__ -> cmd.replyTo.tell(done()))) + .onCommand( + StoreUs.class, + (StoreUs cmd) -> Effect().persist(cmd.texts).thenRun(__ -> cmd.replyTo.tell(done()))) + .onCommand( + GetState.class, + (GetState get) -> + Effect() + .none() + .thenRun(state -> get.replyTo.tell(new State(new HashSet<>(state))))) + .onCommand( + GetReplica.class, + (GetReplica cmd) -> + Effect() + .none() + .thenRun(() -> cmd.replyTo.tell(getActiveActiveContext().replicaId()))) + .onCommand(Stop.class, __ -> Effect().stop()) + .build(); + } + + @Override + public EventHandler, String> eventHandler() { + return newEventHandlerBuilder() + .forAnyState() + .onAnyEvent( + (state, text) -> { + // FIXME mutable - state I don't remember if we support or not so defensive copy for + // now + Set newSet = new HashSet<>(state); + newSet.add(text); + return newSet; + }); + } + } + + @ClassRule + public static final TestKitJunitResource testKit = + new TestKitJunitResource( + ConfigFactory.parseString( + "akka.loglevel = INFO\n" + "akka.loggers = [\"akka.testkit.TestEventListener\"]") + .withFallback(PersistenceTestKitPlugin.getInstance().config())); + + @Rule public final LogCapturing logCapturing = new LogCapturing(); + + // minimal test, full coverage over in ActiveActiveSpec + @Test + public void activeActiveReplicationTest() { + ReplicaId dcA = new ReplicaId("DC-A"); + ReplicaId dcB = new ReplicaId("DC-B"); + ReplicaId dcC = new ReplicaId("DC-C"); + Set allReplicas = new HashSet<>(Arrays.asList(dcA, dcB, dcC)); + + ActorRef replicaA = + testKit.spawn(TestBehavior.create("id1", dcA, allReplicas)); + ActorRef replicaB = + testKit.spawn(TestBehavior.create("id1", dcB, allReplicas)); + ActorRef replicaC = + testKit.spawn(TestBehavior.create("id1", dcC, allReplicas)); + + TestProbe probe = testKit.createTestProbe(); + replicaA.tell(new TestBehavior.GetReplica(probe.ref().narrow())); + assertEquals("DC-A", probe.expectMessageClass(ReplicaId.class).id()); + + replicaA.tell(new TestBehavior.StoreMe("stored-to-a", probe.ref().narrow())); + replicaB.tell(new TestBehavior.StoreMe("stored-to-b", probe.ref().narrow())); + replicaC.tell(new TestBehavior.StoreMe("stored-to-c", probe.ref().narrow())); + probe.receiveSeveralMessages(3); + + probe.awaitAssert( + () -> { + replicaA.tell(new TestBehavior.GetState(probe.ref().narrow())); + TestBehavior.State reply = probe.expectMessageClass(TestBehavior.State.class); + assertEquals( + reply.texts, + new HashSet(Arrays.asList("stored-to-a", "stored-to-b", "stored-to-c"))); + return null; + }); + probe.awaitAssert( + () -> { + replicaB.tell(new TestBehavior.GetState(probe.ref().narrow())); + TestBehavior.State reply = probe.expectMessageClass(TestBehavior.State.class); + assertEquals( + reply.texts, + new HashSet(Arrays.asList("stored-to-a", "stored-to-b", "stored-to-c"))); + return null; + }); + probe.awaitAssert( + () -> { + replicaC.tell(new TestBehavior.GetState(probe.ref().narrow())); + TestBehavior.State reply = probe.expectMessageClass(TestBehavior.State.class); + assertEquals( + reply.texts, + new HashSet(Arrays.asList("stored-to-a", "stored-to-b", "stored-to-c"))); + return null; + }); + } +} diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ActiveActiveCompileOnlyTest.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ActiveActiveCompileOnlyTest.java new file mode 100644 index 0000000000..51085a48b3 --- /dev/null +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ActiveActiveCompileOnlyTest.java @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.akka.persistence.typed; + +import akka.persistence.typed.ReplicaId; +import akka.persistence.typed.javadsl.*; + +import java.util.*; + +public class ActiveActiveCompileOnlyTest { + + // dummy for docs example + interface Command {} + interface Event {} + interface State {} + + static // #factory + final class MyActiceActiveEventSourcedBehavior + extends ActiveActiveEventSourcedBehavior< + Command, + Event, + State> { + + public MyActiceActiveEventSourcedBehavior(ActiveActiveContext activeActiveContext) { + super(activeActiveContext); + } + // ... implementation of abstract methods ... + // #factory + + @Override + public State emptyState() { + return null; + } + + @Override + public CommandHandler commandHandler() { + return null; + } + + @Override + public EventHandler eventHandler() { + return null; + } + // #factory + } + + // #factory + + { + // #replicas + ReplicaId DCA = new ReplicaId("DC-A"); + ReplicaId DCB = new ReplicaId("DC-B"); + Set allReplicas = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList(DCA, DCB))); + // #replicas + + String queryPluginId = ""; + + // #factory-shared + ActiveActiveEventSourcing.withSharedJournal( + "entityId", + DCA, + allReplicas, + queryPluginId, + context -> new MyActiceActiveEventSourcedBehavior(context)); + // #factory-shared + + // #factory + + // bootstrap logic + Map allReplicasAndQueryPlugins = new HashMap<>(); + allReplicasAndQueryPlugins.put(DCA, "journalForDCA"); + allReplicasAndQueryPlugins.put(DCB, "journalForDCB"); + + EventSourcedBehavior behavior = ActiveActiveEventSourcing.create( + "entityId", + DCA, + allReplicasAndQueryPlugins, + context -> new MyActiceActiveEventSourcedBehavior(context)); + // #factory + } +} diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AABlogExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AABlogExampleSpec.scala index f67fd19226..c3d7f4d365 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AABlogExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AABlogExampleSpec.scala @@ -5,18 +5,19 @@ package docs.akka.persistence.typed import akka.Done -import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit } +import akka.actor.testkit.typed.scaladsl.{LogCapturing, ScalaTestWithActorTestKit} import akka.actor.typed.ActorRef -import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } +import akka.actor.typed.scaladsl.{ActorContext, Behaviors} import akka.persistence.testkit.PersistenceTestKitPlugin import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal +import akka.persistence.typed.LwwTime import akka.persistence.typed.ReplicaId import akka.persistence.typed.crdt.LwwTime import akka.persistence.typed.scaladsl._ import akka.serialization.jackson.CborSerializable -import org.scalatest.concurrent.{ Eventually, ScalaFutures } +import org.scalatest.concurrent.{Eventually, ScalaFutures} import org.scalatest.matchers.should.Matchers -import org.scalatest.time.{ Millis, Span } +import org.scalatest.time.{Millis, Span} import org.scalatest.wordspec.AnyWordSpecLike object AABlogExampleSpec { diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ActiveActiveCompileOnlySpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ActiveActiveCompileOnlySpec.scala index 55196657d3..dc0668ff6c 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ActiveActiveCompileOnlySpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ActiveActiveCompileOnlySpec.scala @@ -1,8 +1,14 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + package docs.akka.persistence.typed import akka.persistence.typed.ReplicaId -import akka.persistence.typed.scaladsl.{ ActiveActiveEventSourcing, EventSourcedBehavior } +import akka.persistence.typed.scaladsl.{ActiveActiveEventSourcing, EventSourcedBehavior} +import com.github.ghik.silencer.silent +@silent("never used") object ActiveActiveCompileOnlySpec { //#replicas @@ -13,15 +19,19 @@ object ActiveActiveCompileOnlySpec { val queryPluginId = "" + trait Command + trait State + trait Event + //#factory-shared ActiveActiveEventSourcing.withSharedJournal("entityId", DCA, AllReplicas, queryPluginId) { context => - EventSourcedBehavior(???, ???, ???, ???) + EventSourcedBehavior[Command, State, Event](???, ???, ???, ???) } //#factory-shared //#factory ActiveActiveEventSourcing("entityId", DCA, Map(DCA -> "journalForDCA", DCB -> "journalForDCB")) { context => - EventSourcedBehavior(???, ???, ???, ???) + EventSourcedBehavior[Command, State, Event](???, ???, ???, ???) } //#factory diff --git a/akka-persistence-typed/src/main/mima-filters/2.6.7.backwards.excludes/29217-active-active-event-sourcing.excludes b/akka-persistence-typed/src/main/mima-filters/2.6.7.backwards.excludes/29217-active-active-event-sourcing.excludes index b0597f04c8..5a6490fdc2 100644 --- a/akka-persistence-typed/src/main/mima-filters/2.6.7.backwards.excludes/29217-active-active-event-sourcing.excludes +++ b/akka-persistence-typed/src/main/mima-filters/2.6.7.backwards.excludes/29217-active-active-event-sourcing.excludes @@ -1,6 +1,4 @@ -# Changes to internal/private +# Changes to internal/private/do not extend ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withActiveActive") -ProblemFilters.exclude[Problem]("akka.persistence.typed.internal.Running*") -ProblemFilters.exclude[Problem]("akka.persistence.typed.internal.EventSourcedBehaviorImpl.*") -ProblemFilters.exclude[Problem]("akka.persistence.typed.internal.BehaviorSetup*") - +ProblemFilters.exclude[Problem]("akka.persistence.typed.internal.*") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withEventPublishing") diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/LwwTime.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/LwwTime.scala new file mode 100644 index 0000000000..1342d8000c --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/LwwTime.scala @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.typed + +/** + * Utility class for comparing timestamp and data center + * identifier when implementing last-writer wins. + */ +final case class LwwTime(timestamp: Long, originDc: ReplicaId) { + + /** + * Create a new `LwwTime` that has a `timestamp` that is + * `max` of the given timestamp and previous timestamp + 1, + * i.e. monotonically increasing. + */ + def increase(t: Long, replicaId: ReplicaId): LwwTime = + LwwTime(math.max(timestamp + 1, t), replicaId) + + /** + * Compare this `LwwTime` with the `other`. + * Greatest timestamp wins. If both timestamps are + * equal the `dc` identifiers are compared and the + * one sorted first in alphanumeric order wins. + */ + def isAfter(other: LwwTime): Boolean = { + if (timestamp > other.timestamp) true + else if (timestamp < other.timestamp) false + else if (other.originDc.id.compareTo(originDc.id) > 0) true + else false + } +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index b9f032786d..27e47222e4 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -252,10 +252,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( } override private[akka] def withActiveActive( - context: ActiveActiveContextImpl, - replicaId: ReplicaId, - allReplicaIdsAndQueryPlugins: Map[ReplicaId, String]): EventSourcedBehavior[Command, Event, State] = { - copy(activeActive = Some(ActiveActive(replicaId, allReplicaIdsAndQueryPlugins, context))) + context: ActiveActiveContextImpl): EventSourcedBehavior[Command, Event, State] = { + copy(activeActive = Some(ActiveActive(context.replicaId, context.replicasAndQueryPlugins, context))) } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ActiveActiveEventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ActiveActiveEventSourcedBehavior.scala new file mode 100644 index 0000000000..d4ad096a74 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ActiveActiveEventSourcedBehavior.scala @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.typed.javadsl + +import java.util.Optional + +import akka.actor.typed.BackoffSupervisorStrategy +import akka.actor.typed.Behavior +import akka.actor.typed.TypedActorContext +import akka.annotation.InternalApi +import akka.persistence.typed.internal +import akka.persistence.typed.internal.EffectImpl +import akka.persistence.typed.scaladsl.ActiveActiveContextImpl + +abstract class ActiveActiveEventSourcedBehavior[Command, Event, State]( + activeActiveContext: ActiveActiveContext, + onPersistFailure: Optional[BackoffSupervisorStrategy]) + extends EventSourcedBehavior[Command, Event, State](activeActiveContext.persistenceId, onPersistFailure) { + + def this(activeActiveContext: ActiveActiveContext) = this(activeActiveContext, Optional.empty()) + + protected def getActiveActiveContext(): ActiveActiveContext = activeActiveContext + + /** + * INTERNAL API: DeferredBehavior init, not for user extension + */ + @InternalApi override def apply(context: TypedActorContext[Command]): Behavior[Command] = { + // Note: duplicated in EventSourcedBehavior to not break source compatibility + val snapshotWhen: (State, Event, Long) => Boolean = (state, event, seqNr) => shouldSnapshot(state, event, seqNr) + + val tagger: Event => Set[String] = { event => + import akka.util.ccompat.JavaConverters._ + val tags = tagsFor(event) + if (tags.isEmpty) Set.empty + else tags.asScala.toSet + } + + val behavior = new internal.EventSourcedBehaviorImpl[Command, Event, State]( + persistenceId, + emptyState, + (state, cmd) => commandHandler()(state, cmd).asInstanceOf[EffectImpl[Event, State]], + eventHandler()(_, _), + getClass) + .snapshotWhen(snapshotWhen) + .withRetention(retentionCriteria.asScala) + .withTagger(tagger) + .eventAdapter(eventAdapter()) + .snapshotAdapter(snapshotAdapter()) + .withJournalPluginId(journalPluginId) + .withSnapshotPluginId(snapshotPluginId) + .withRecovery(recovery.asScala) + // context not user extendable so there should never be any other impls + .withActiveActive(activeActiveContext.asInstanceOf[ActiveActiveContextImpl]) + + val handler = signalHandler() + val behaviorWithSignalHandler = + if (handler.isEmpty) behavior + else behavior.receiveSignal(handler.handler) + + if (onPersistFailure.isPresent) + behaviorWithSignalHandler.onPersistFailure(onPersistFailure.get) + else + behaviorWithSignalHandler + } +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ActiveActiveEventSourcing.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ActiveActiveEventSourcing.scala new file mode 100644 index 0000000000..67761d410a --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ActiveActiveEventSourcing.scala @@ -0,0 +1,96 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.typed.javadsl + +import java.util.function.{ Function => JFunction } +import java.util.{ Set => JSet } +import java.util.{ Map => JMap } + +import akka.annotation.DoNotInherit +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.ReplicaId +import akka.persistence.typed.scaladsl.ActiveActiveContextImpl + +import scala.collection.JavaConverters._ + +/** + * Provides access to Active Active specific state + * + * Not for user extension + */ +@DoNotInherit +trait ActiveActiveContext { + def origin: ReplicaId + def concurrent: Boolean + def replicaId: ReplicaId + def getAllReplicas: JSet[ReplicaId] + def persistenceId: PersistenceId + def recoveryRunning: Boolean + def entityId: String + def currentTimeMillis(): Long +} + +/** + * Factory to create an instance of an ActiveActiveEventSourcedBehavior + */ +@FunctionalInterface +trait ActiveActiveBehaviorFactory[Command, Event, State] { + def apply(aaContext: ActiveActiveContext): ActiveActiveEventSourcedBehavior[Command, Event, State] +} + +object ActiveActiveEventSourcing { + + /** + * Initialize a replicated event sourced behavior where all entity replicas are stored in the same journal. + * + * Events from each replica for the same entityId will be replicated to every copy. + * Care must be taken to handle events in any order as events can happen concurrently at different replicas. + * + * Using an replicated event sourced behavior means there is no longer the single writer guarantee. + * + * A different journal plugin id can be configured using withJournalPluginId after creation. Different databases + * can be used for each replica. + * The events from other replicas are read using PersistentQuery. + * + * @param replicaId The unique identity for this entity. The underlying persistence id will include the replica. + * @param allReplicaIds All replica ids. These need to be known to receive events from all replicas. + * @param queryPluginId A single query plugin used to read the events from other replicas. Must be the query side of your configured journal plugin. + */ + def withSharedJournal[Command, Event, State]( + entityId: String, + replicaId: ReplicaId, + allReplicaIds: JSet[ReplicaId], + queryPluginId: String, + behaviorFactory: JFunction[ActiveActiveContext, EventSourcedBehavior[Command, Event, State]]) + : EventSourcedBehavior[Command, Event, State] = + create(entityId, replicaId, allReplicaIds.asScala.map(id => id -> queryPluginId).toMap.asJava, behaviorFactory) + + /** + * Initialize a replicated event sourced behavior. + * + * Events from each replica for the same entityId will be replicated to every copy. + * Care must be taken to handle events in any order as events can happen concurrently at different replicas. + * + * Using an replicated event sourced behavior means there is no longer the single writer guarantee. + * + * The journal plugin id for the entity itself can be configured using withJournalPluginId after creation. + * A query side identifier is passed per replica allowing for separate database/journal configuration per + * replica. The events from other replicas are read using PersistentQuery. + * + * @param replicaId The unique identity for this entity. The underlying persistence id will include the replica. + * @param allReplicasAndQueryPlugins All replica ids and a query plugin per replica id. These need to be known to receive events from all replicas + * and configured with the query plugin for the journal that each replica uses. + */ + def create[Command, Event, State]( + entityId: String, + replicaId: ReplicaId, + allReplicasAndQueryPlugins: JMap[ReplicaId, String], + eventSourcedBehaviorFactory: JFunction[ActiveActiveContext, EventSourcedBehavior[Command, Event, State]]) + : EventSourcedBehavior[Command, Event, State] = { + val context = new ActiveActiveContextImpl(entityId, replicaId, allReplicasAndQueryPlugins.asScala.toMap) + eventSourcedBehaviorFactory(context) + } + +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala index 0b896c4b0e..00c6008121 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala @@ -176,9 +176,10 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] ( def snapshotAdapter(): SnapshotAdapter[State] = NoOpSnapshotAdapter.instance[State] /** - * INTERNAL API: DeferredBehavior init + * INTERNAL API: DeferredBehavior init, not for user extension */ @InternalApi override def apply(context: typed.TypedActorContext[Command]): Behavior[Command] = { + // Note: duplicated in ActiveActiveEventSourcedBehavior to not break source compatibility val snapshotWhen: (State, Event, Long) => Boolean = (state, event, seqNr) => shouldSnapshot(state, event, seqNr) val tagger: Event => Set[String] = { event => diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala index 33a4a53e04..1f7b3dee1f 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala @@ -7,6 +7,7 @@ package akka.persistence.typed.scaladsl import akka.persistence.typed.PersistenceId import akka.persistence.typed.ReplicaId import akka.util.WallClock +import scala.collection.JavaConverters._ // FIXME docs trait ActiveActiveContext { @@ -30,7 +31,8 @@ private[akka] class ActiveActiveContextImpl( val entityId: String, val replicaId: ReplicaId, val replicasAndQueryPlugins: Map[ReplicaId, String]) - extends ActiveActiveContext { + extends ActiveActiveContext + with akka.persistence.typed.javadsl.ActiveActiveContext { val allReplicas: Set[ReplicaId] = replicasAndQueryPlugins.keySet var _origin: ReplicaId = null var _recoveryRunning: Boolean = false @@ -56,6 +58,8 @@ private[akka] class ActiveActiveContextImpl( WallClock.AlwaysIncreasingClock.currentTimeMillis() } override def recoveryRunning: Boolean = _recoveryRunning + + override def getAllReplicas: java.util.Set[ReplicaId] = allReplicas.asJava } object ActiveActiveEventSourcing { @@ -80,9 +84,10 @@ object ActiveActiveEventSourcing { entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId], - queryPluginId: String)(activeActiveContext: ActiveActiveContext => EventSourcedBehavior[Command, Event, State]) + queryPluginId: String)( + eventSourcedBehaviorFactory: ActiveActiveContext => EventSourcedBehavior[Command, Event, State]) : EventSourcedBehavior[Command, Event, State] = - apply(entityId, replicaId, allReplicaIds.map(id => id -> queryPluginId).toMap)(activeActiveContext) + apply(entityId, replicaId, allReplicaIds.map(id => id -> queryPluginId).toMap)(eventSourcedBehaviorFactory) /** * Initialize a replicated event sourced behavior. @@ -104,10 +109,10 @@ object ActiveActiveEventSourcing { entityId: String, replicaId: ReplicaId, allReplicasAndQueryPlugins: Map[ReplicaId, String])( - activeActiveContext: ActiveActiveContext => EventSourcedBehavior[Command, Event, State]) + eventSourcedBehaviorFactory: ActiveActiveContext => EventSourcedBehavior[Command, Event, State]) : EventSourcedBehavior[Command, Event, State] = { val context = new ActiveActiveContextImpl(entityId, replicaId, allReplicasAndQueryPlugins) - activeActiveContext(context).withActiveActive(context, replicaId, allReplicasAndQueryPlugins) + eventSourcedBehaviorFactory(context).withActiveActive(context) } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala index 56dcf975cf..eba5e0a6a8 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala @@ -166,10 +166,7 @@ object EventSourcedBehavior { */ def withJournalPluginId(id: String): EventSourcedBehavior[Command, Event, State] - private[akka] def withActiveActive( - context: ActiveActiveContextImpl, - replicaId: ReplicaId, - allReplicasAndQueryPlugins: Map[ReplicaId, String]): EventSourcedBehavior[Command, Event, State] + private[akka] def withActiveActive(context: ActiveActiveContextImpl): EventSourcedBehavior[Command, Event, State] /** * Change the snapshot store plugin id that this actor should use.