Active active java apis #29266
This commit is contained in:
parent
116c13677a
commit
21713bbc30
13 changed files with 563 additions and 32 deletions
|
|
@ -53,32 +53,47 @@ To assist in implementing the event handler active-active detects these conflict
|
|||
|
||||
## API
|
||||
|
||||
The same API as regular `EventSourcedBehavior`s is used to define the logic. To enable an entity for active-active
|
||||
replication use the factory methods on @api[ActiveActiveEventSourcing].
|
||||
@scala[The same API as regular `EventSourcedBehavior`s]@java[A very similar API to the regular `EventSourcedBehavior`] is used to define the logic.
|
||||
|
||||
To enable an entity for active-active
|
||||
replication @java[let it extend `ActiveActiveEventSourcedBehavior` instead of `EventSourcedBehavior` and] use the factory methods on @apidoc[ActiveActiveEventSourcing].
|
||||
|
||||
All replicas need to be known up front:
|
||||
|
||||
Scala
|
||||
: @@snip [ActiveActiveCompileOnlySpec.scala](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ActiveActiveCompileOnlySpec.scala) { #replicas }
|
||||
|
||||
Java
|
||||
: @@snip [ActiveActiveCompileOnlyTest.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ActiveActiveCompileOnlyTest.java) { #replicas }
|
||||
|
||||
|
||||
Then to enable replication create the event sourced behavior with the factory method:
|
||||
|
||||
Scala
|
||||
: @@snip [ActiveActiveCompileOnlySpec.scala](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ActiveActiveCompileOnlySpec.scala) { #factory }
|
||||
|
||||
Java
|
||||
: @@snip [ActiveActiveCompileOnlyTest.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ActiveActiveCompileOnlyTest.java) { #factory }
|
||||
|
||||
The factory takes in:
|
||||
|
||||
* EntityID: this will be used as part of the underlying persistenceId
|
||||
* Replica: Which replica this instance is
|
||||
* All Replicas and the query plugin used to read their events
|
||||
* All Replicas and the query plugin used to read their events
|
||||
* A factory function to create an instance of the @scala[`EventSourcedBehavior`]@java[`ActiveActiveEventSourcedBehavior`]
|
||||
|
||||
In this scenario each replica reads from each other's database effectively providing cross region replication for any database that has an Akka Persistence plugin. Alternatively if all the replicas use the same journal, e.g. for testing or if it is a distributed database such as Cassandra, the `withSharedJournal` factory can be used.
|
||||
|
||||
Scala
|
||||
: @@snip [ActiveActiveCompileOnlySpec.scala](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ActiveActiveCompileOnlySpec.scala) { #factory-shared}
|
||||
|
||||
Java
|
||||
: @@snip [ActiveActiveCompileOnlyTest.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ActiveActiveCompileOnlyTest.java) { #factory-shared }
|
||||
|
||||
The function passed to both factory methods return an `EventSourcedBehavior` and provide access to the @api[ActiveActiveContext] that has the following methods:
|
||||
|
||||
@@@ div { .group-scala }
|
||||
|
||||
The function passed to both factory methods return an `EventSourcedBehavior` and provide access to the @apidoc[ActiveActiveContext] that has the following methods:
|
||||
|
||||
* entityId
|
||||
* replicaId
|
||||
|
|
@ -87,6 +102,24 @@ The function passed to both factory methods return an `EventSourcedBehavior` and
|
|||
|
||||
As well as methods that **can only be** used in the event handler. The values these methods return relate to the event that is being processed.
|
||||
|
||||
@@@
|
||||
|
||||
@@@ div { .group-java }
|
||||
|
||||
The function passed to both factory methods is invoked with a special @apidoc[ActiveActiveContext] that needs to be passed to the
|
||||
concrete `ActiveActiveEventSourcedBehavior` and on to the super constructor.
|
||||
|
||||
The context gives access to:
|
||||
|
||||
* entityId
|
||||
* replicaId
|
||||
* allReplicas
|
||||
* persistenceId
|
||||
|
||||
As well as methods that **can only be** used in the event handler, accessed through `getActiveActiveContext`. The values these methods return relate to the event that is being processed.
|
||||
|
||||
@@@
|
||||
|
||||
* origin: The ReplicaId that originally created the event
|
||||
* concurrent: Whether the event was concurrent with another event as in the second diagram above
|
||||
* recoveryRunning: Whether a recovery is running. Can be used to send commands back to self for side effects that should only happen once.
|
||||
|
|
@ -108,9 +141,9 @@ Sometimes it is enough to use timestamps to decide which update should win. Such
|
|||
|
||||

|
||||
|
||||
There is a small utility class @api[LwwTime] that can be useful for implementing last writer wins semantics.
|
||||
There is a small utility class @apidoc[akka.persistence.typed.LwwTime] that can be useful for implementing last writer wins semantics.
|
||||
It contains a timestamp representing current time when the event was persisted and an identifier of the
|
||||
replica that persisted it. When comparing two @api[LwwTime] the greatest timestamp wins. The replica
|
||||
replica that persisted it. When comparing two @apidoc[akka.persistence.typed.LwwTime] the greatest timestamp wins. The replica
|
||||
identifier is used if the two timestamps are equal, and then the one from the data center sorted first in
|
||||
alphanumeric order wins.
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,208 @@
|
|||
/*
|
||||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.typed;
|
||||
|
||||
import akka.Done;
|
||||
import akka.actor.testkit.typed.javadsl.LogCapturing;
|
||||
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
|
||||
import akka.actor.testkit.typed.javadsl.TestProbe;
|
||||
import akka.actor.typed.ActorRef;
|
||||
import akka.actor.typed.Behavior;
|
||||
import akka.persistence.testkit.PersistenceTestKitPlugin;
|
||||
import akka.persistence.testkit.javadsl.PersistenceTestKit;
|
||||
import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal;
|
||||
import akka.persistence.typed.javadsl.*;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.scalatestplus.junit.JUnitSuite;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import static akka.Done.done;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class ActiveActiveTest extends JUnitSuite {
|
||||
|
||||
static final class TestBehavior
|
||||
extends ActiveActiveEventSourcedBehavior<TestBehavior.Command, String, Set<String>> {
|
||||
interface Command {}
|
||||
|
||||
static final class GetState implements Command {
|
||||
final ActorRef<State> replyTo;
|
||||
|
||||
public GetState(ActorRef<State> replyTo) {
|
||||
this.replyTo = replyTo;
|
||||
}
|
||||
}
|
||||
|
||||
static final class StoreMe implements Command {
|
||||
final String text;
|
||||
final ActorRef<Done> replyTo;
|
||||
|
||||
public StoreMe(String text, ActorRef<Done> replyTo) {
|
||||
this.text = text;
|
||||
this.replyTo = replyTo;
|
||||
}
|
||||
}
|
||||
|
||||
static final class StoreUs implements Command {
|
||||
final List<String> texts;
|
||||
final ActorRef<Done> replyTo;
|
||||
|
||||
public StoreUs(List<String> texts, ActorRef<Done> replyTo) {
|
||||
this.texts = texts;
|
||||
this.replyTo = replyTo;
|
||||
}
|
||||
}
|
||||
|
||||
static final class GetReplica implements Command {
|
||||
final ActorRef<ReplicaId> replyTo;
|
||||
|
||||
public GetReplica(ActorRef<ReplicaId> replyTo) {
|
||||
this.replyTo = replyTo;
|
||||
}
|
||||
}
|
||||
|
||||
static final class State {
|
||||
final Set<String> texts;
|
||||
|
||||
public State(Set<String> texts) {
|
||||
this.texts = texts;
|
||||
}
|
||||
}
|
||||
|
||||
enum Stop implements Command {
|
||||
INSTANCE
|
||||
}
|
||||
|
||||
public static Behavior<Command> create(
|
||||
String entityId, ReplicaId replicaId, Set<ReplicaId> allReplicas) {
|
||||
return ActiveActiveEventSourcing.withSharedJournal(
|
||||
entityId,
|
||||
replicaId,
|
||||
allReplicas,
|
||||
PersistenceTestKitReadJournal.Identifier(),
|
||||
TestBehavior::new);
|
||||
}
|
||||
|
||||
private TestBehavior(ActiveActiveContext activeActiveContext) {
|
||||
super(activeActiveContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String journalPluginId() {
|
||||
return PersistenceTestKitPlugin.PluginId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> emptyState() {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CommandHandler<Command, String, Set<String>> commandHandler() {
|
||||
return newCommandHandlerBuilder()
|
||||
.forAnyState()
|
||||
.onCommand(
|
||||
StoreMe.class,
|
||||
(StoreMe cmd) -> Effect().persist(cmd.text).thenRun(__ -> cmd.replyTo.tell(done())))
|
||||
.onCommand(
|
||||
StoreUs.class,
|
||||
(StoreUs cmd) -> Effect().persist(cmd.texts).thenRun(__ -> cmd.replyTo.tell(done())))
|
||||
.onCommand(
|
||||
GetState.class,
|
||||
(GetState get) ->
|
||||
Effect()
|
||||
.none()
|
||||
.thenRun(state -> get.replyTo.tell(new State(new HashSet<>(state)))))
|
||||
.onCommand(
|
||||
GetReplica.class,
|
||||
(GetReplica cmd) ->
|
||||
Effect()
|
||||
.none()
|
||||
.thenRun(() -> cmd.replyTo.tell(getActiveActiveContext().replicaId())))
|
||||
.onCommand(Stop.class, __ -> Effect().stop())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventHandler<Set<String>, String> eventHandler() {
|
||||
return newEventHandlerBuilder()
|
||||
.forAnyState()
|
||||
.onAnyEvent(
|
||||
(state, text) -> {
|
||||
// FIXME mutable - state I don't remember if we support or not so defensive copy for
|
||||
// now
|
||||
Set<String> newSet = new HashSet<>(state);
|
||||
newSet.add(text);
|
||||
return newSet;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@ClassRule
|
||||
public static final TestKitJunitResource testKit =
|
||||
new TestKitJunitResource(
|
||||
ConfigFactory.parseString(
|
||||
"akka.loglevel = INFO\n" + "akka.loggers = [\"akka.testkit.TestEventListener\"]")
|
||||
.withFallback(PersistenceTestKitPlugin.getInstance().config()));
|
||||
|
||||
@Rule public final LogCapturing logCapturing = new LogCapturing();
|
||||
|
||||
// minimal test, full coverage over in ActiveActiveSpec
|
||||
@Test
|
||||
public void activeActiveReplicationTest() {
|
||||
ReplicaId dcA = new ReplicaId("DC-A");
|
||||
ReplicaId dcB = new ReplicaId("DC-B");
|
||||
ReplicaId dcC = new ReplicaId("DC-C");
|
||||
Set<ReplicaId> allReplicas = new HashSet<>(Arrays.asList(dcA, dcB, dcC));
|
||||
|
||||
ActorRef<TestBehavior.Command> replicaA =
|
||||
testKit.spawn(TestBehavior.create("id1", dcA, allReplicas));
|
||||
ActorRef<TestBehavior.Command> replicaB =
|
||||
testKit.spawn(TestBehavior.create("id1", dcB, allReplicas));
|
||||
ActorRef<TestBehavior.Command> replicaC =
|
||||
testKit.spawn(TestBehavior.create("id1", dcC, allReplicas));
|
||||
|
||||
TestProbe<Object> probe = testKit.createTestProbe();
|
||||
replicaA.tell(new TestBehavior.GetReplica(probe.ref().narrow()));
|
||||
assertEquals("DC-A", probe.expectMessageClass(ReplicaId.class).id());
|
||||
|
||||
replicaA.tell(new TestBehavior.StoreMe("stored-to-a", probe.ref().narrow()));
|
||||
replicaB.tell(new TestBehavior.StoreMe("stored-to-b", probe.ref().narrow()));
|
||||
replicaC.tell(new TestBehavior.StoreMe("stored-to-c", probe.ref().narrow()));
|
||||
probe.receiveSeveralMessages(3);
|
||||
|
||||
probe.awaitAssert(
|
||||
() -> {
|
||||
replicaA.tell(new TestBehavior.GetState(probe.ref().narrow()));
|
||||
TestBehavior.State reply = probe.expectMessageClass(TestBehavior.State.class);
|
||||
assertEquals(
|
||||
reply.texts,
|
||||
new HashSet<String>(Arrays.asList("stored-to-a", "stored-to-b", "stored-to-c")));
|
||||
return null;
|
||||
});
|
||||
probe.awaitAssert(
|
||||
() -> {
|
||||
replicaB.tell(new TestBehavior.GetState(probe.ref().narrow()));
|
||||
TestBehavior.State reply = probe.expectMessageClass(TestBehavior.State.class);
|
||||
assertEquals(
|
||||
reply.texts,
|
||||
new HashSet<String>(Arrays.asList("stored-to-a", "stored-to-b", "stored-to-c")));
|
||||
return null;
|
||||
});
|
||||
probe.awaitAssert(
|
||||
() -> {
|
||||
replicaC.tell(new TestBehavior.GetState(probe.ref().narrow()));
|
||||
TestBehavior.State reply = probe.expectMessageClass(TestBehavior.State.class);
|
||||
assertEquals(
|
||||
reply.texts,
|
||||
new HashSet<String>(Arrays.asList("stored-to-a", "stored-to-b", "stored-to-c")));
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.akka.persistence.typed;
|
||||
|
||||
import akka.persistence.typed.ReplicaId;
|
||||
import akka.persistence.typed.javadsl.*;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
public class ActiveActiveCompileOnlyTest {
|
||||
|
||||
// dummy for docs example
|
||||
interface Command {}
|
||||
interface Event {}
|
||||
interface State {}
|
||||
|
||||
static // #factory
|
||||
final class MyActiceActiveEventSourcedBehavior
|
||||
extends ActiveActiveEventSourcedBehavior<
|
||||
Command,
|
||||
Event,
|
||||
State> {
|
||||
|
||||
public MyActiceActiveEventSourcedBehavior(ActiveActiveContext activeActiveContext) {
|
||||
super(activeActiveContext);
|
||||
}
|
||||
// ... implementation of abstract methods ...
|
||||
// #factory
|
||||
|
||||
@Override
|
||||
public State emptyState() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CommandHandler<Command, Event, State> commandHandler() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventHandler<State, Event> eventHandler() {
|
||||
return null;
|
||||
}
|
||||
// #factory
|
||||
}
|
||||
|
||||
// #factory
|
||||
|
||||
{
|
||||
// #replicas
|
||||
ReplicaId DCA = new ReplicaId("DC-A");
|
||||
ReplicaId DCB = new ReplicaId("DC-B");
|
||||
Set<ReplicaId> allReplicas =
|
||||
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(DCA, DCB)));
|
||||
// #replicas
|
||||
|
||||
String queryPluginId = "";
|
||||
|
||||
// #factory-shared
|
||||
ActiveActiveEventSourcing.withSharedJournal(
|
||||
"entityId",
|
||||
DCA,
|
||||
allReplicas,
|
||||
queryPluginId,
|
||||
context -> new MyActiceActiveEventSourcedBehavior(context));
|
||||
// #factory-shared
|
||||
|
||||
// #factory
|
||||
|
||||
// bootstrap logic
|
||||
Map<ReplicaId, String> allReplicasAndQueryPlugins = new HashMap<>();
|
||||
allReplicasAndQueryPlugins.put(DCA, "journalForDCA");
|
||||
allReplicasAndQueryPlugins.put(DCB, "journalForDCB");
|
||||
|
||||
EventSourcedBehavior<Command, Event, State> behavior = ActiveActiveEventSourcing.create(
|
||||
"entityId",
|
||||
DCA,
|
||||
allReplicasAndQueryPlugins,
|
||||
context -> new MyActiceActiveEventSourcedBehavior(context));
|
||||
// #factory
|
||||
}
|
||||
}
|
||||
|
|
@ -5,18 +5,19 @@
|
|||
package docs.akka.persistence.typed
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit }
|
||||
import akka.actor.testkit.typed.scaladsl.{LogCapturing, ScalaTestWithActorTestKit}
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
|
||||
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
|
||||
import akka.persistence.testkit.PersistenceTestKitPlugin
|
||||
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
|
||||
import akka.persistence.typed.LwwTime
|
||||
import akka.persistence.typed.ReplicaId
|
||||
import akka.persistence.typed.crdt.LwwTime
|
||||
import akka.persistence.typed.scaladsl._
|
||||
import akka.serialization.jackson.CborSerializable
|
||||
import org.scalatest.concurrent.{ Eventually, ScalaFutures }
|
||||
import org.scalatest.concurrent.{Eventually, ScalaFutures}
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.time.{ Millis, Span }
|
||||
import org.scalatest.time.{Millis, Span}
|
||||
import org.scalatest.wordspec.AnyWordSpecLike
|
||||
|
||||
object AABlogExampleSpec {
|
||||
|
|
|
|||
|
|
@ -1,8 +1,14 @@
|
|||
/*
|
||||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package docs.akka.persistence.typed
|
||||
|
||||
import akka.persistence.typed.ReplicaId
|
||||
import akka.persistence.typed.scaladsl.{ ActiveActiveEventSourcing, EventSourcedBehavior }
|
||||
import akka.persistence.typed.scaladsl.{ActiveActiveEventSourcing, EventSourcedBehavior}
|
||||
import com.github.ghik.silencer.silent
|
||||
|
||||
@silent("never used")
|
||||
object ActiveActiveCompileOnlySpec {
|
||||
|
||||
//#replicas
|
||||
|
|
@ -13,15 +19,19 @@ object ActiveActiveCompileOnlySpec {
|
|||
|
||||
val queryPluginId = ""
|
||||
|
||||
trait Command
|
||||
trait State
|
||||
trait Event
|
||||
|
||||
//#factory-shared
|
||||
ActiveActiveEventSourcing.withSharedJournal("entityId", DCA, AllReplicas, queryPluginId) { context =>
|
||||
EventSourcedBehavior(???, ???, ???, ???)
|
||||
EventSourcedBehavior[Command, State, Event](???, ???, ???, ???)
|
||||
}
|
||||
//#factory-shared
|
||||
|
||||
//#factory
|
||||
ActiveActiveEventSourcing("entityId", DCA, Map(DCA -> "journalForDCA", DCB -> "journalForDCB")) { context =>
|
||||
EventSourcedBehavior(???, ???, ???, ???)
|
||||
EventSourcedBehavior[Command, State, Event](???, ???, ???, ???)
|
||||
}
|
||||
//#factory
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,4 @@
|
|||
# Changes to internal/private
|
||||
# Changes to internal/private/do not extend
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withActiveActive")
|
||||
ProblemFilters.exclude[Problem]("akka.persistence.typed.internal.Running*")
|
||||
ProblemFilters.exclude[Problem]("akka.persistence.typed.internal.EventSourcedBehaviorImpl.*")
|
||||
ProblemFilters.exclude[Problem]("akka.persistence.typed.internal.BehaviorSetup*")
|
||||
|
||||
ProblemFilters.exclude[Problem]("akka.persistence.typed.internal.*")
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withEventPublishing")
|
||||
|
|
|
|||
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.typed
|
||||
|
||||
/**
|
||||
* Utility class for comparing timestamp and data center
|
||||
* identifier when implementing last-writer wins.
|
||||
*/
|
||||
final case class LwwTime(timestamp: Long, originDc: ReplicaId) {
|
||||
|
||||
/**
|
||||
* Create a new `LwwTime` that has a `timestamp` that is
|
||||
* `max` of the given timestamp and previous timestamp + 1,
|
||||
* i.e. monotonically increasing.
|
||||
*/
|
||||
def increase(t: Long, replicaId: ReplicaId): LwwTime =
|
||||
LwwTime(math.max(timestamp + 1, t), replicaId)
|
||||
|
||||
/**
|
||||
* Compare this `LwwTime` with the `other`.
|
||||
* Greatest timestamp wins. If both timestamps are
|
||||
* equal the `dc` identifiers are compared and the
|
||||
* one sorted first in alphanumeric order wins.
|
||||
*/
|
||||
def isAfter(other: LwwTime): Boolean = {
|
||||
if (timestamp > other.timestamp) true
|
||||
else if (timestamp < other.timestamp) false
|
||||
else if (other.originDc.id.compareTo(originDc.id) > 0) true
|
||||
else false
|
||||
}
|
||||
}
|
||||
|
|
@ -252,10 +252,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
|
|||
}
|
||||
|
||||
override private[akka] def withActiveActive(
|
||||
context: ActiveActiveContextImpl,
|
||||
replicaId: ReplicaId,
|
||||
allReplicaIdsAndQueryPlugins: Map[ReplicaId, String]): EventSourcedBehavior[Command, Event, State] = {
|
||||
copy(activeActive = Some(ActiveActive(replicaId, allReplicaIdsAndQueryPlugins, context)))
|
||||
context: ActiveActiveContextImpl): EventSourcedBehavior[Command, Event, State] = {
|
||||
copy(activeActive = Some(ActiveActive(context.replicaId, context.replicasAndQueryPlugins, context)))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.typed.javadsl
|
||||
|
||||
import java.util.Optional
|
||||
|
||||
import akka.actor.typed.BackoffSupervisorStrategy
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.TypedActorContext
|
||||
import akka.annotation.InternalApi
|
||||
import akka.persistence.typed.internal
|
||||
import akka.persistence.typed.internal.EffectImpl
|
||||
import akka.persistence.typed.scaladsl.ActiveActiveContextImpl
|
||||
|
||||
abstract class ActiveActiveEventSourcedBehavior[Command, Event, State](
|
||||
activeActiveContext: ActiveActiveContext,
|
||||
onPersistFailure: Optional[BackoffSupervisorStrategy])
|
||||
extends EventSourcedBehavior[Command, Event, State](activeActiveContext.persistenceId, onPersistFailure) {
|
||||
|
||||
def this(activeActiveContext: ActiveActiveContext) = this(activeActiveContext, Optional.empty())
|
||||
|
||||
protected def getActiveActiveContext(): ActiveActiveContext = activeActiveContext
|
||||
|
||||
/**
|
||||
* INTERNAL API: DeferredBehavior init, not for user extension
|
||||
*/
|
||||
@InternalApi override def apply(context: TypedActorContext[Command]): Behavior[Command] = {
|
||||
// Note: duplicated in EventSourcedBehavior to not break source compatibility
|
||||
val snapshotWhen: (State, Event, Long) => Boolean = (state, event, seqNr) => shouldSnapshot(state, event, seqNr)
|
||||
|
||||
val tagger: Event => Set[String] = { event =>
|
||||
import akka.util.ccompat.JavaConverters._
|
||||
val tags = tagsFor(event)
|
||||
if (tags.isEmpty) Set.empty
|
||||
else tags.asScala.toSet
|
||||
}
|
||||
|
||||
val behavior = new internal.EventSourcedBehaviorImpl[Command, Event, State](
|
||||
persistenceId,
|
||||
emptyState,
|
||||
(state, cmd) => commandHandler()(state, cmd).asInstanceOf[EffectImpl[Event, State]],
|
||||
eventHandler()(_, _),
|
||||
getClass)
|
||||
.snapshotWhen(snapshotWhen)
|
||||
.withRetention(retentionCriteria.asScala)
|
||||
.withTagger(tagger)
|
||||
.eventAdapter(eventAdapter())
|
||||
.snapshotAdapter(snapshotAdapter())
|
||||
.withJournalPluginId(journalPluginId)
|
||||
.withSnapshotPluginId(snapshotPluginId)
|
||||
.withRecovery(recovery.asScala)
|
||||
// context not user extendable so there should never be any other impls
|
||||
.withActiveActive(activeActiveContext.asInstanceOf[ActiveActiveContextImpl])
|
||||
|
||||
val handler = signalHandler()
|
||||
val behaviorWithSignalHandler =
|
||||
if (handler.isEmpty) behavior
|
||||
else behavior.receiveSignal(handler.handler)
|
||||
|
||||
if (onPersistFailure.isPresent)
|
||||
behaviorWithSignalHandler.onPersistFailure(onPersistFailure.get)
|
||||
else
|
||||
behaviorWithSignalHandler
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,96 @@
|
|||
/*
|
||||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.typed.javadsl
|
||||
|
||||
import java.util.function.{ Function => JFunction }
|
||||
import java.util.{ Set => JSet }
|
||||
import java.util.{ Map => JMap }
|
||||
|
||||
import akka.annotation.DoNotInherit
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.ReplicaId
|
||||
import akka.persistence.typed.scaladsl.ActiveActiveContextImpl
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
/**
|
||||
* Provides access to Active Active specific state
|
||||
*
|
||||
* Not for user extension
|
||||
*/
|
||||
@DoNotInherit
|
||||
trait ActiveActiveContext {
|
||||
def origin: ReplicaId
|
||||
def concurrent: Boolean
|
||||
def replicaId: ReplicaId
|
||||
def getAllReplicas: JSet[ReplicaId]
|
||||
def persistenceId: PersistenceId
|
||||
def recoveryRunning: Boolean
|
||||
def entityId: String
|
||||
def currentTimeMillis(): Long
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory to create an instance of an ActiveActiveEventSourcedBehavior
|
||||
*/
|
||||
@FunctionalInterface
|
||||
trait ActiveActiveBehaviorFactory[Command, Event, State] {
|
||||
def apply(aaContext: ActiveActiveContext): ActiveActiveEventSourcedBehavior[Command, Event, State]
|
||||
}
|
||||
|
||||
object ActiveActiveEventSourcing {
|
||||
|
||||
/**
|
||||
* Initialize a replicated event sourced behavior where all entity replicas are stored in the same journal.
|
||||
*
|
||||
* Events from each replica for the same entityId will be replicated to every copy.
|
||||
* Care must be taken to handle events in any order as events can happen concurrently at different replicas.
|
||||
*
|
||||
* Using an replicated event sourced behavior means there is no longer the single writer guarantee.
|
||||
*
|
||||
* A different journal plugin id can be configured using withJournalPluginId after creation. Different databases
|
||||
* can be used for each replica.
|
||||
* The events from other replicas are read using PersistentQuery.
|
||||
*
|
||||
* @param replicaId The unique identity for this entity. The underlying persistence id will include the replica.
|
||||
* @param allReplicaIds All replica ids. These need to be known to receive events from all replicas.
|
||||
* @param queryPluginId A single query plugin used to read the events from other replicas. Must be the query side of your configured journal plugin.
|
||||
*/
|
||||
def withSharedJournal[Command, Event, State](
|
||||
entityId: String,
|
||||
replicaId: ReplicaId,
|
||||
allReplicaIds: JSet[ReplicaId],
|
||||
queryPluginId: String,
|
||||
behaviorFactory: JFunction[ActiveActiveContext, EventSourcedBehavior[Command, Event, State]])
|
||||
: EventSourcedBehavior[Command, Event, State] =
|
||||
create(entityId, replicaId, allReplicaIds.asScala.map(id => id -> queryPluginId).toMap.asJava, behaviorFactory)
|
||||
|
||||
/**
|
||||
* Initialize a replicated event sourced behavior.
|
||||
*
|
||||
* Events from each replica for the same entityId will be replicated to every copy.
|
||||
* Care must be taken to handle events in any order as events can happen concurrently at different replicas.
|
||||
*
|
||||
* Using an replicated event sourced behavior means there is no longer the single writer guarantee.
|
||||
*
|
||||
* The journal plugin id for the entity itself can be configured using withJournalPluginId after creation.
|
||||
* A query side identifier is passed per replica allowing for separate database/journal configuration per
|
||||
* replica. The events from other replicas are read using PersistentQuery.
|
||||
*
|
||||
* @param replicaId The unique identity for this entity. The underlying persistence id will include the replica.
|
||||
* @param allReplicasAndQueryPlugins All replica ids and a query plugin per replica id. These need to be known to receive events from all replicas
|
||||
* and configured with the query plugin for the journal that each replica uses.
|
||||
*/
|
||||
def create[Command, Event, State](
|
||||
entityId: String,
|
||||
replicaId: ReplicaId,
|
||||
allReplicasAndQueryPlugins: JMap[ReplicaId, String],
|
||||
eventSourcedBehaviorFactory: JFunction[ActiveActiveContext, EventSourcedBehavior[Command, Event, State]])
|
||||
: EventSourcedBehavior[Command, Event, State] = {
|
||||
val context = new ActiveActiveContextImpl(entityId, replicaId, allReplicasAndQueryPlugins.asScala.toMap)
|
||||
eventSourcedBehaviorFactory(context)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -176,9 +176,10 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] (
|
|||
def snapshotAdapter(): SnapshotAdapter[State] = NoOpSnapshotAdapter.instance[State]
|
||||
|
||||
/**
|
||||
* INTERNAL API: DeferredBehavior init
|
||||
* INTERNAL API: DeferredBehavior init, not for user extension
|
||||
*/
|
||||
@InternalApi override def apply(context: typed.TypedActorContext[Command]): Behavior[Command] = {
|
||||
// Note: duplicated in ActiveActiveEventSourcedBehavior to not break source compatibility
|
||||
val snapshotWhen: (State, Event, Long) => Boolean = (state, event, seqNr) => shouldSnapshot(state, event, seqNr)
|
||||
|
||||
val tagger: Event => Set[String] = { event =>
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ package akka.persistence.typed.scaladsl
|
|||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.ReplicaId
|
||||
import akka.util.WallClock
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
// FIXME docs
|
||||
trait ActiveActiveContext {
|
||||
|
|
@ -30,7 +31,8 @@ private[akka] class ActiveActiveContextImpl(
|
|||
val entityId: String,
|
||||
val replicaId: ReplicaId,
|
||||
val replicasAndQueryPlugins: Map[ReplicaId, String])
|
||||
extends ActiveActiveContext {
|
||||
extends ActiveActiveContext
|
||||
with akka.persistence.typed.javadsl.ActiveActiveContext {
|
||||
val allReplicas: Set[ReplicaId] = replicasAndQueryPlugins.keySet
|
||||
var _origin: ReplicaId = null
|
||||
var _recoveryRunning: Boolean = false
|
||||
|
|
@ -56,6 +58,8 @@ private[akka] class ActiveActiveContextImpl(
|
|||
WallClock.AlwaysIncreasingClock.currentTimeMillis()
|
||||
}
|
||||
override def recoveryRunning: Boolean = _recoveryRunning
|
||||
|
||||
override def getAllReplicas: java.util.Set[ReplicaId] = allReplicas.asJava
|
||||
}
|
||||
|
||||
object ActiveActiveEventSourcing {
|
||||
|
|
@ -80,9 +84,10 @@ object ActiveActiveEventSourcing {
|
|||
entityId: String,
|
||||
replicaId: ReplicaId,
|
||||
allReplicaIds: Set[ReplicaId],
|
||||
queryPluginId: String)(activeActiveContext: ActiveActiveContext => EventSourcedBehavior[Command, Event, State])
|
||||
queryPluginId: String)(
|
||||
eventSourcedBehaviorFactory: ActiveActiveContext => EventSourcedBehavior[Command, Event, State])
|
||||
: EventSourcedBehavior[Command, Event, State] =
|
||||
apply(entityId, replicaId, allReplicaIds.map(id => id -> queryPluginId).toMap)(activeActiveContext)
|
||||
apply(entityId, replicaId, allReplicaIds.map(id => id -> queryPluginId).toMap)(eventSourcedBehaviorFactory)
|
||||
|
||||
/**
|
||||
* Initialize a replicated event sourced behavior.
|
||||
|
|
@ -104,10 +109,10 @@ object ActiveActiveEventSourcing {
|
|||
entityId: String,
|
||||
replicaId: ReplicaId,
|
||||
allReplicasAndQueryPlugins: Map[ReplicaId, String])(
|
||||
activeActiveContext: ActiveActiveContext => EventSourcedBehavior[Command, Event, State])
|
||||
eventSourcedBehaviorFactory: ActiveActiveContext => EventSourcedBehavior[Command, Event, State])
|
||||
: EventSourcedBehavior[Command, Event, State] = {
|
||||
val context = new ActiveActiveContextImpl(entityId, replicaId, allReplicasAndQueryPlugins)
|
||||
activeActiveContext(context).withActiveActive(context, replicaId, allReplicasAndQueryPlugins)
|
||||
eventSourcedBehaviorFactory(context).withActiveActive(context)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -166,10 +166,7 @@ object EventSourcedBehavior {
|
|||
*/
|
||||
def withJournalPluginId(id: String): EventSourcedBehavior[Command, Event, State]
|
||||
|
||||
private[akka] def withActiveActive(
|
||||
context: ActiveActiveContextImpl,
|
||||
replicaId: ReplicaId,
|
||||
allReplicasAndQueryPlugins: Map[ReplicaId, String]): EventSourcedBehavior[Command, Event, State]
|
||||
private[akka] def withActiveActive(context: ActiveActiveContextImpl): EventSourcedBehavior[Command, Event, State]
|
||||
|
||||
/**
|
||||
* Change the snapshot store plugin id that this actor should use.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue