From 36a8b6f24a437adb75f59e3c29c7d3bf559d300b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Wed, 8 Jul 2020 11:20:45 +0200 Subject: [PATCH] Active active per replica journal selection (#29350) * Support for having multiple isolated testkit journals and corresponding read journals in persistence testkit * Read journal selection per active-active replica * a dedicated ReplicaId type to avoid stringly typed mismatches --- ...ctiveActiveShardingDirectReplication.scala | 7 +- ...eActiveShardingDirectReplicationSpec.scala | 14 ++- .../testkit/PersistenceTestKitPlugin.scala | 12 +- .../testkit/ProcessingPolicy.scala | 4 +- .../persistence/testkit/SnapshotStorage.scala | 7 +- .../internal/InMemStorageExtension.scala | 45 +++++-- .../SnapshotStorageEmulatorExtension.scala | 3 +- .../testkit/internal/TestKitStorage.scala | 3 +- ...ersistenceTestKitReadJournalProvider.scala | 6 +- .../PersistenceTestKitReadJournal.scala | 14 ++- .../testkit/scaladsl/PersistenceTestKit.scala | 2 +- .../scaladsl/MultipleJournalsSpec.scala | 115 ++++++++++++++++++ .../ActiveActiveEventPublishingSpec.scala | 56 +++++---- .../persistence/typed/ActiveActiveSpec.scala | 94 ++++++++------ .../typed/MultiJournalActiveActiveSpec.scala | 111 +++++++++++++++++ .../AAAuctionExampleSpec.scala | 41 ++++--- .../AABlogExampleSpec.scala | 33 +++-- .../persistence/typed/PersistenceId.scala | 8 +- .../akka/persistence/typed/ReplicaId.scala | 10 ++ .../typed/internal/BehaviorSetup.scala | 3 +- .../internal/EventSourcedBehaviorImpl.scala | 14 +-- .../typed/internal/ReplayingEvents.scala | 5 +- .../persistence/typed/internal/Running.scala | 13 +- .../scaladsl/ActiveActiveEventSourcing.scala | 63 +++++++--- .../typed/scaladsl/EventSourcedBehavior.scala | 17 +-- 25 files changed, 532 insertions(+), 168 deletions(-) create mode 100644 akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/MultipleJournalsSpec.scala create mode 100644 akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/MultiJournalActiveActiveSpec.scala create mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/ReplicaId.scala diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplication.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplication.scala index 1c1315b459..3700e26ac6 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplication.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplication.scala @@ -13,6 +13,7 @@ import akka.annotation.ApiMayChange import akka.annotation.DoNotInherit import akka.annotation.InternalApi import akka.persistence.typed.PublishedEvent +import akka.persistence.typed.ReplicaId import scala.collection.JavaConverters._ @@ -53,7 +54,9 @@ object ActiveActiveShardingDirectReplication { * @param selfReplica The replica id of the replica that runs on this node * @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system */ - def create[T](selfReplica: String, replicaShardingProxies: java.util.Map[String, ActorRef[T]]): Behavior[Command] = + def create[T]( + selfReplica: ReplicaId, + replicaShardingProxies: java.util.Map[ReplicaId, ActorRef[T]]): Behavior[Command] = apply(selfReplica, replicaShardingProxies.asScala.toMap) /** @@ -61,7 +64,7 @@ object ActiveActiveShardingDirectReplication { * @param selfReplica The replica id of the replica that runs on this node * @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system */ - def apply[T](selfReplica: String, replicaShardingProxies: Map[String, ActorRef[T]]): Behavior[Command] = + def apply[T](selfReplica: ReplicaId, replicaShardingProxies: Map[ReplicaId, ActorRef[T]]): Behavior[Command] = Behaviors.setup[Command] { context => context.log.debug( "Subscribing to event stream to forward events to [{}] sharded replicas", diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplicationSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplicationSpec.scala index 9b2653399e..bb85730a88 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplicationSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplicationSpec.scala @@ -9,9 +9,11 @@ import akka.Done import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.eventstream.EventStream +import akka.persistence.typed import akka.persistence.typed.PersistenceId import akka.persistence.typed.PublishedEvent import akka.persistence.typed.internal.{ PublishedEventImpl, ReplicatedPublishedEventMetaData, VersionVector } +import akka.persistence.typed.ReplicaId class ActiveActiveShardingDirectReplicationSpec extends ScalaTestWithActorTestKit @@ -27,20 +29,22 @@ class ActiveActiveShardingDirectReplicationSpec val replicationActor = spawn( ActiveActiveShardingDirectReplication( - "ReplicaA", - replicaShardingProxies = - Map("ReplicaA" -> replicaAProbe.ref, "ReplicaB" -> replicaBProbe.ref, "ReplicaC" -> replicaCProbe.ref))) + typed.ReplicaId("ReplicaA"), + replicaShardingProxies = Map( + ReplicaId("ReplicaA") -> replicaAProbe.ref, + ReplicaId("ReplicaB") -> replicaBProbe.ref, + ReplicaId("ReplicaC") -> replicaCProbe.ref))) val upProbe = createTestProbe[Done]() replicationActor ! ActiveActiveShardingDirectReplication.VerifyStarted(upProbe.ref) upProbe.receiveMessage() // not bullet proof wrt to subscription being complete but good enough val event = PublishedEventImpl( - PersistenceId.replicatedUniqueId("pid", "ReplicaA"), + PersistenceId.replicatedUniqueId("pid", ReplicaId("ReplicaA")), 1L, "event", System.currentTimeMillis(), - Some(new ReplicatedPublishedEventMetaData("ReplicaA", VersionVector.empty))) + Some(new ReplicatedPublishedEventMetaData(ReplicaId("ReplicaA"), VersionVector.empty))) system.eventStream ! EventStream.Publish(event) replicaBProbe.receiveMessage().message should equal(event) diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala index 77dcd86a25..8738d8c206 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala @@ -4,6 +4,8 @@ package akka.persistence.testkit +import akka.actor.ActorLogging + import scala.collection.immutable import scala.concurrent.Future import scala.util.Try @@ -13,6 +15,7 @@ import akka.persistence._ import akka.persistence.journal.{ AsyncWriteJournal, Tagged } import akka.persistence.snapshot.SnapshotStore import akka.persistence.testkit.internal.{ InMemStorageExtension, SnapshotStorageEmulatorExtension } +import akka.util.unused /** * INTERNAL API @@ -20,9 +23,12 @@ import akka.persistence.testkit.internal.{ InMemStorageExtension, SnapshotStorag * Persistence testkit plugin for events. */ @InternalApi -class PersistenceTestKitPlugin extends AsyncWriteJournal { +class PersistenceTestKitPlugin(@unused cfg: Config, cfgPath: String) extends AsyncWriteJournal with ActorLogging { - private final val storage = InMemStorageExtension(context.system) + private final val storage = { + log.debug("Using in memory storage [{}] for test kit journal", cfgPath) + InMemStorageExtension(context.system).storageFor(cfgPath) + } private val eventStream = context.system.eventStream override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = { @@ -60,7 +66,7 @@ class PersistenceTestKitPlugin extends AsyncWriteJournal { object PersistenceTestKitPlugin { - val PluginId = "akka.persistence.testkit.journal.pluginid" + val PluginId = "akka.persistence.testkit.journal" import akka.util.ccompat.JavaConverters._ diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/ProcessingPolicy.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/ProcessingPolicy.scala index 119f1b9163..0871bf56bb 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/ProcessingPolicy.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/ProcessingPolicy.scala @@ -6,6 +6,8 @@ package akka.persistence.testkit import akka.annotation.{ ApiMayChange, InternalApi } +import scala.util.control.NoStackTrace + /** * Policies allow to emulate behavior of the storage (failures and rejections). * @@ -150,7 +152,7 @@ object ExpectedRejection extends ExpectedRejection { } -sealed abstract class ExpectedFailure extends Throwable +sealed abstract class ExpectedFailure extends Throwable with NoStackTrace object ExpectedFailure extends ExpectedFailure { diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/SnapshotStorage.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/SnapshotStorage.scala index 1c66d012b8..1ddeeb5df5 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/SnapshotStorage.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/SnapshotStorage.scala @@ -4,8 +4,9 @@ package akka.persistence.testkit -import scala.util.Success +import akka.actor.Extension +import scala.util.Success import akka.annotation.InternalApi import akka.persistence.{ SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria } import akka.persistence.testkit.ProcessingPolicy.DefaultPolicies @@ -15,7 +16,9 @@ import akka.persistence.testkit.internal.TestKitStorage * INTERNAL API */ @InternalApi -private[testkit] trait SnapshotStorage extends TestKitStorage[SnapshotOperation, (SnapshotMetadata, Any)] { +private[testkit] trait SnapshotStorage + extends TestKitStorage[SnapshotOperation, (SnapshotMetadata, Any)] + with Extension { import SnapshotStorage._ diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/InMemStorageExtension.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/InMemStorageExtension.scala index 83652f3b56..8454e74143 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/InMemStorageExtension.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/InMemStorageExtension.scala @@ -4,26 +4,55 @@ package akka.persistence.testkit.internal +import java.util.concurrent.ConcurrentHashMap + +import akka.actor.Extension import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider } import akka.annotation.InternalApi import akka.persistence.testkit.EventStorage +import akka.persistence.testkit.JournalOperation +import akka.persistence.testkit.PersistenceTestKitPlugin +import akka.persistence.testkit.ProcessingPolicy import akka.persistence.testkit.scaladsl.PersistenceTestKit /** * INTERNAL API */ @InternalApi -private[testkit] object InMemStorageExtension extends ExtensionId[EventStorage] with ExtensionIdProvider { +private[testkit] object InMemStorageExtension extends ExtensionId[InMemStorageExtension] with ExtensionIdProvider { - override def get(system: ActorSystem): EventStorage = super.get(system) + override def get(system: ActorSystem): InMemStorageExtension = super.get(system) - override def createExtension(system: ExtendedActorSystem) = - if (PersistenceTestKit.Settings(system).serialize) { - new SerializedEventStorageImpl(system) - } else { - new SimpleEventStorageImpl - } + override def createExtension(system: ExtendedActorSystem): InMemStorageExtension = + new InMemStorageExtension(system) override def lookup = InMemStorageExtension } + +/** + * INTERNAL API + */ +@InternalApi +final class InMemStorageExtension(system: ExtendedActorSystem) extends Extension { + + private val stores = new ConcurrentHashMap[String, EventStorage]() + + def defaultStorage(): EventStorage = storageFor(PersistenceTestKitPlugin.PluginId) + + // shortcuts for default policy + def currentPolicy: ProcessingPolicy[JournalOperation] = defaultStorage().currentPolicy + def setPolicy(policy: ProcessingPolicy[JournalOperation]): Unit = defaultStorage().setPolicy(policy) + def resetPolicy(): Unit = defaultStorage().resetPolicy() + + def storageFor(key: String): EventStorage = + stores.computeIfAbsent(key, _ => { + // we don't really care about the key here, we just want separate instances + if (PersistenceTestKit.Settings(system).serialize) { + new SerializedEventStorageImpl(system) + } else { + new SimpleEventStorageImpl + } + }) + +} diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SnapshotStorageEmulatorExtension.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SnapshotStorageEmulatorExtension.scala index 56e5b86153..7de3f8e9d3 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SnapshotStorageEmulatorExtension.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SnapshotStorageEmulatorExtension.scala @@ -4,7 +4,8 @@ package akka.persistence.testkit.internal -import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } +import akka.actor.Extension +import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider } import akka.annotation.InternalApi import akka.persistence.testkit.SnapshotStorage import akka.persistence.testkit.scaladsl.SnapshotTestKit diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/TestKitStorage.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/TestKitStorage.scala index 0b3fd01981..6f85dd7d83 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/TestKitStorage.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/TestKitStorage.scala @@ -9,7 +9,6 @@ import java.util.concurrent.atomic.AtomicReference import scala.collection.immutable -import akka.actor.Extension import akka.annotation.InternalApi import akka.persistence.testkit.ProcessingPolicy @@ -151,4 +150,4 @@ sealed trait PolicyOps[U] { * INTERNAL API */ @InternalApi -private[testkit] trait TestKitStorage[P, R] extends InMemStorage[String, R] with PolicyOps[P] with Extension +private[testkit] trait TestKitStorage[P, R] extends InMemStorage[String, R] with PolicyOps[P] diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/PersistenceTestKitReadJournalProvider.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/PersistenceTestKitReadJournalProvider.scala index dcd6246d40..cd0644590d 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/PersistenceTestKitReadJournalProvider.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/PersistenceTestKitReadJournalProvider.scala @@ -5,11 +5,13 @@ package akka.persistence.testkit.query import akka.actor.ExtendedActorSystem import akka.persistence.query.ReadJournalProvider +import com.typesafe.config.Config -class PersistenceTestKitReadJournalProvider(system: ExtendedActorSystem) extends ReadJournalProvider { +class PersistenceTestKitReadJournalProvider(system: ExtendedActorSystem, config: Config, configPath: String) + extends ReadJournalProvider { override def scaladslReadJournal(): scaladsl.PersistenceTestKitReadJournal = - new scaladsl.PersistenceTestKitReadJournal(system) + new scaladsl.PersistenceTestKitReadJournal(system, config, configPath) override def javadslReadJournal(): javadsl.PersistenceTestKitReadJournal = new javadsl.PersistenceTestKitReadJournal(scaladslReadJournal()) diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala index ebc50c233a..1df3f34bde 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala @@ -11,17 +11,27 @@ import akka.persistence.testkit.EventStorage import akka.persistence.testkit.internal.InMemStorageExtension import akka.persistence.testkit.query.internal.EventsByPersistenceIdStage import akka.stream.scaladsl.Source +import akka.util.unused +import com.typesafe.config.Config +import org.slf4j.LoggerFactory object PersistenceTestKitReadJournal { val Identifier = "akka.persistence.testkit.query" } -final class PersistenceTestKitReadJournal(system: ExtendedActorSystem) +final class PersistenceTestKitReadJournal(system: ExtendedActorSystem, @unused config: Config, configPath: String) extends ReadJournal with EventsByPersistenceIdQuery with CurrentEventsByPersistenceIdQuery { - private val storage: EventStorage = InMemStorageExtension(system) + private val log = LoggerFactory.getLogger(getClass) + + private val storage: EventStorage = { + // use shared path up to before `query` to identify which inmem journal we are addressing + val storagePluginId = configPath.replaceAll("""query$""", "journal") + log.debug("Using in memory storage [{}] for test kit read journal", storagePluginId) + InMemStorageExtension(system).storageFor(storagePluginId) + } override def eventsByPersistenceId( persistenceId: String, diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala index 1aa3910860..2fa2033421 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala @@ -431,7 +431,7 @@ class PersistenceTestKit(system: ActorSystem) import PersistenceTestKit._ - override protected val storage = InMemStorageExtension(system) + override protected val storage = InMemStorageExtension(system).storageFor(PersistenceTestKitPlugin.PluginId) private final lazy val settings = Settings(system) diff --git a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/MultipleJournalsSpec.scala b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/MultipleJournalsSpec.scala new file mode 100644 index 0000000000..093ebb35a8 --- /dev/null +++ b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/MultipleJournalsSpec.scala @@ -0,0 +1,115 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.testkit.scaladsl + +import akka.Done +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.persistence.query.PersistenceQuery +import akka.persistence.query.scaladsl.CurrentEventsByPersistenceIdQuery +import akka.persistence.testkit.PersistenceTestKitPlugin +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.stream.scaladsl.Sink +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike + +object MultipleJournalsSpec { + + object ListActor { + sealed trait Command + case class Save(text: String, replyTo: ActorRef[Done]) extends Command + case class ShowMeWhatYouGot(replyTo: ActorRef[Set[String]]) extends Command + case object Stop extends Command + + def apply(persistenceId: String, journal: String): Behavior[Command] = + EventSourcedBehavior[Command, String, Set[String]]( + PersistenceId.ofUniqueId(persistenceId), + Set.empty[String], + (state, cmd) => + cmd match { + case Save(text, replyTo) => + Effect.persist(text).thenRun(_ => replyTo ! Done) + case ShowMeWhatYouGot(replyTo) => + replyTo ! state + Effect.none + case Stop => + Effect.stop() + }, + (state, evt) => state + evt).withJournalPluginId(journal) + + } + + def config = ConfigFactory.parseString(s""" + journal1 { + # journal and query expected to be next to each other under config path + journal.class = "${classOf[PersistenceTestKitPlugin].getName}" + query = $${akka.persistence.testkit.query} + } + journal2 { + journal.class = "${classOf[PersistenceTestKitPlugin].getName}" + query = $${akka.persistence.testkit.query} + } + """).withFallback(ConfigFactory.load()).resolve() + +} + +class MultipleJournalsSpec + extends ScalaTestWithActorTestKit(MultipleJournalsSpec.config) + with AnyWordSpecLike + with LogCapturing { + + import MultipleJournalsSpec._ + + "The testkit journal and query plugin" must { + + "be possible to configure and use in multiple isolated instances" in { + val probe = createTestProbe[Any]() + + { + // one actor in each journal with same id + val j1 = spawn(ListActor("id1", "journal1.journal")) + val j2 = spawn(ListActor("id1", "journal2.journal")) + j1 ! ListActor.Save("j1m1", probe.ref) + probe.receiveMessage() + j2 ! ListActor.Save("j2m1", probe.ref) + probe.receiveMessage() + + j1 ! ListActor.Stop + probe.expectTerminated(j1) + j2 ! ListActor.Stop + probe.expectTerminated(j2) + } + + { + // new incarnations in each journal with same id + val j1 = spawn(ListActor("id1", "journal1.journal")) + val j2 = spawn(ListActor("id1", "journal2.journal")) + + // does not see each others events + j1 ! ListActor.ShowMeWhatYouGot(probe.ref) + probe.expectMessage(Set("j1m1")) + j2 ! ListActor.ShowMeWhatYouGot(probe.ref) + probe.expectMessage(Set("j2m1")) + } + + val readJournal1 = PersistenceQuery(system).readJournalFor[CurrentEventsByPersistenceIdQuery]("journal1.query") + val readJournal2 = PersistenceQuery(system).readJournalFor[CurrentEventsByPersistenceIdQuery]("journal2.query") + + val eventsForJournal1 = + readJournal1.currentEventsByPersistenceId("id1", 0L, Long.MaxValue).runWith(Sink.seq).futureValue + eventsForJournal1.map(_.event) should ===(Seq("j1m1")) + + val eventsForJournal2 = + readJournal2.currentEventsByPersistenceId("id1", 0L, Long.MaxValue).runWith(Sink.seq).futureValue + eventsForJournal2.map(_.event) should ===(Seq("j2m1")) + } + + } + +} diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveEventPublishingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveEventPublishingSpec.scala index f844bb8538..56c8042ac8 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveEventPublishingSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveEventPublishingSpec.scala @@ -26,9 +26,13 @@ object ActiveActiveEventPublishingSpec { case class Get(replyTo: ActorRef[Set[String]]) extends Command case object Stop extends Command - def apply(entityId: String, replicaId: String, allReplicas: Set[String]): Behavior[Command] = + def apply(entityId: String, replicaId: ReplicaId, allReplicas: Set[ReplicaId]): Behavior[Command] = Behaviors.setup { ctx => - ActiveActiveEventSourcing(entityId, replicaId, allReplicas, PersistenceTestKitReadJournal.Identifier)( + ActiveActiveEventSourcing.withSharedJournal( + entityId, + replicaId, + allReplicas, + PersistenceTestKitReadJournal.Identifier)( aactx => EventSourcedBehavior[Command, String, Set[String]]( aactx.persistenceId, @@ -57,6 +61,10 @@ class ActiveActiveEventPublishingSpec with AnyWordSpecLike with LogCapturing { + val DCA = ReplicaId("DC-A") + val DCB = ReplicaId("DC-B") + val DCC = ReplicaId("DC-C") + private var idCounter = 0 def nextEntityId(): String = { idCounter += 1 @@ -68,18 +76,18 @@ class ActiveActiveEventPublishingSpec "An active active actor" must { "move forward when a published event from a replica is received" in { val id = nextEntityId() - val actor = spawn(MyActiveActive(id, "DC-A", Set("DC-A", "DC-B"))) + val actor = spawn(MyActiveActive(id, DCA, Set(DCA, DCB))) val probe = createTestProbe[Any]() actor ! MyActiveActive.Add("one", probe.ref) probe.expectMessage(Done) // simulate a published event from another replica actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - PersistenceId.replicatedUniqueId(id, "DC-B"), + PersistenceId.replicatedUniqueId(id, DCB), 1L, "two", System.currentTimeMillis(), - Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty))) + Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty))) actor ! MyActiveActive.Add("three", probe.ref) probe.expectMessage(Done) @@ -89,18 +97,18 @@ class ActiveActiveEventPublishingSpec "ignore a published event from a replica is received but the sequence number is unexpected" in { val id = nextEntityId() - val actor = spawn(MyActiveActive(id, "DC-A", Set("DC-A", "DC-B"))) + val actor = spawn(MyActiveActive(id, DCA, Set(DCA, DCB))) val probe = createTestProbe[Any]() actor ! MyActiveActive.Add("one", probe.ref) probe.expectMessage(Done) // simulate a published event from another replica actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - PersistenceId.replicatedUniqueId(id, "DC-B"), + PersistenceId.replicatedUniqueId(id, DCB), 2L, // missing 1L "two", System.currentTimeMillis(), - Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty))) + Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty))) actor ! MyActiveActive.Add("three", probe.ref) probe.expectMessage(Done) @@ -110,18 +118,18 @@ class ActiveActiveEventPublishingSpec "ignore a published event from an unknown replica" in { val id = nextEntityId() - val actor = spawn(MyActiveActive(id, "DC-A", Set("DC-A", "DC-B"))) + val actor = spawn(MyActiveActive(id, DCA, Set(DCA, DCB))) val probe = createTestProbe[Any]() actor ! MyActiveActive.Add("one", probe.ref) probe.expectMessage(Done) // simulate a published event from another replica actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - PersistenceId.replicatedUniqueId(id, "DC-C"), + PersistenceId.replicatedUniqueId(id, DCC), 1L, "two", System.currentTimeMillis(), - Some(new ReplicatedPublishedEventMetaData("DC-C", VersionVector.empty))) + Some(new ReplicatedPublishedEventMetaData(DCC, VersionVector.empty))) actor ! MyActiveActive.Add("three", probe.ref) probe.expectMessage(Done) @@ -131,25 +139,25 @@ class ActiveActiveEventPublishingSpec "ignore an already seen event from a replica" in { val id = nextEntityId() - val actor = spawn(MyActiveActive(id, "DC-A", Set("DC-A", "DC-B"))) + val actor = spawn(MyActiveActive(id, DCA, Set(DCA, DCB))) val probe = createTestProbe[Any]() actor ! MyActiveActive.Add("one", probe.ref) probe.expectMessage(Done) // simulate a published event from another replica actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - PersistenceId.replicatedUniqueId("myId4", "DC-B"), + PersistenceId.replicatedUniqueId("myId4", DCB), 1L, "two", System.currentTimeMillis(), - Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty))) + Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty))) // simulate another published event from that replica actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - PersistenceId.replicatedUniqueId(id, "DC-B"), + PersistenceId.replicatedUniqueId(id, DCB), 1L, "two-again", // ofc this would be the same in the real world, different just so we can detect System.currentTimeMillis(), - Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty))) + Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty))) actor ! MyActiveActive.Add("three", probe.ref) probe.expectMessage(Done) @@ -161,7 +169,7 @@ class ActiveActiveEventPublishingSpec "handle published events after replay" in { val id = nextEntityId() val probe = createTestProbe[Any]() - val activeActiveBehavior = MyActiveActive(id, "DC-A", Set("DC-A", "DC-B")) + val activeActiveBehavior = MyActiveActive(id, DCA, Set(DCA, DCB)) val incarnation1 = spawn(activeActiveBehavior) incarnation1 ! MyActiveActive.Add("one", probe.ref) probe.expectMessage(Done) @@ -177,11 +185,11 @@ class ActiveActiveEventPublishingSpec // simulate a published event from another replica incarnation2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - PersistenceId.replicatedUniqueId(id, "DC-B"), + PersistenceId.replicatedUniqueId(id, DCB), 1L, "two", System.currentTimeMillis(), - Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty))) + Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty))) incarnation2 ! MyActiveActive.Add("three", probe.ref) probe.expectMessage(Done) @@ -193,18 +201,18 @@ class ActiveActiveEventPublishingSpec "handle published events before and after replay" in { val id = nextEntityId() val probe = createTestProbe[Any]() - val activeActiveBehaviorA = MyActiveActive(id, "DC-A", Set("DC-A", "DC-B")) + val activeActiveBehaviorA = MyActiveActive(id, DCA, Set(DCA, DCB)) val incarnationA1 = spawn(activeActiveBehaviorA) incarnationA1 ! MyActiveActive.Add("one", probe.ref) probe.expectMessage(Done) // simulate a published event from another replica incarnationA1.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - PersistenceId.replicatedUniqueId(id, "DC-B"), + PersistenceId.replicatedUniqueId(id, DCB), 1L, "two", System.currentTimeMillis(), - Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty))) + Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty))) incarnationA1 ! MyActiveActive.Stop probe.expectTerminated(incarnationA1) @@ -213,11 +221,11 @@ class ActiveActiveEventPublishingSpec // simulate a published event from another replica incarnationA2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - PersistenceId.replicatedUniqueId(id, "DC-B"), + PersistenceId.replicatedUniqueId(id, DCB), 2L, "three", System.currentTimeMillis(), - Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty))) + Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty))) incarnationA2 ! MyActiveActive.Add("four", probe.ref) probe.expectMessage(Done) diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSpec.scala index 58effb19cf..e9f33b0bc2 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSpec.scala @@ -7,23 +7,27 @@ package akka.persistence.typed import java.util.concurrent.atomic.AtomicInteger import akka.Done -import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit } -import akka.actor.typed.{ ActorRef, Behavior } +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior import akka.persistence.testkit.PersistenceTestKitPlugin import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal -import akka.persistence.typed.scaladsl.{ ActiveActiveEventSourcing, Effect, EventSourcedBehavior } +import akka.persistence.typed.scaladsl.ActiveActiveEventSourcing +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior import org.scalatest.concurrent.Eventually import org.scalatest.wordspec.AnyWordSpecLike object ActiveActiveSpec { - val AllReplicas = Set("R1", "R2", "R3") + val AllReplicas = Set(ReplicaId("R1"), ReplicaId("R2"), ReplicaId("R3")) sealed trait Command case class GetState(replyTo: ActorRef[State]) extends Command case class StoreMe(description: String, replyTo: ActorRef[Done]) extends Command case class StoreUs(descriptions: List[String], replyTo: ActorRef[Done]) extends Command - case class GetReplica(replyTo: ActorRef[(String, Set[String])]) extends Command + case class GetReplica(replyTo: ActorRef[(ReplicaId, Set[ReplicaId])]) extends Command case object Stop extends Command case class State(all: List[String]) @@ -34,7 +38,11 @@ object ActiveActiveSpec { entityId: String, replicaId: String, probe: Option[ActorRef[EventAndContext]] = None): Behavior[Command] = - ActiveActiveEventSourcing(entityId, replicaId, AllReplicas, PersistenceTestKitReadJournal.Identifier)( + ActiveActiveEventSourcing.withSharedJournal( + entityId, + ReplicaId(replicaId), + AllReplicas, + PersistenceTestKitReadJournal.Identifier)( aaContext => EventSourcedBehavior[Command, String, State]( aaContext.persistenceId, @@ -61,7 +69,7 @@ object ActiveActiveSpec { } -case class EventAndContext(event: Any, origin: String, recoveryRunning: Boolean, concurrent: Boolean) +case class EventAndContext(event: Any, origin: ReplicaId, recoveryRunning: Boolean, concurrent: Boolean) class ActiveActiveSpec extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config) @@ -145,10 +153,10 @@ class ActiveActiveSpec "have access to replica information" in { val entityId = nextEntityId - val probe = createTestProbe[(String, Set[String])]() + val probe = createTestProbe[(ReplicaId, Set[ReplicaId])]() val r1 = spawn(testBehavior(entityId, "R1")) r1 ! GetReplica(probe.ref) - probe.expectMessage(("R1", Set("R1", "R2", "R3"))) + probe.expectMessage((ReplicaId("R1"), Set(ReplicaId("R1"), ReplicaId("R2"), ReplicaId("R3")))) } "have access to event origin" in { @@ -161,12 +169,12 @@ class ActiveActiveSpec val r2 = spawn(testBehavior(entityId, "R2", eventProbeR2.ref)) r1 ! StoreMe("from r1", replyProbe.ref) - eventProbeR2.expectMessage(EventAndContext("from r1", "R1", false, false)) - eventProbeR1.expectMessage(EventAndContext("from r1", "R1", false, false)) + eventProbeR2.expectMessage(EventAndContext("from r1", ReplicaId("R1"), false, false)) + eventProbeR1.expectMessage(EventAndContext("from r1", ReplicaId("R1"), false, false)) r2 ! StoreMe("from r2", replyProbe.ref) - eventProbeR1.expectMessage(EventAndContext("from r2", "R2", false, false)) - eventProbeR2.expectMessage(EventAndContext("from r2", "R2", false, false)) + eventProbeR1.expectMessage(EventAndContext("from r2", ReplicaId("R2"), false, false)) + eventProbeR2.expectMessage(EventAndContext("from r2", ReplicaId("R2"), false, false)) } "set recovery running" in { @@ -175,12 +183,12 @@ class ActiveActiveSpec val replyProbe = createTestProbe[Done]() val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref)) r1 ! StoreMe("Event", replyProbe.ref) - eventProbeR1.expectMessage(EventAndContext("Event", "R1", recoveryRunning = false, false)) + eventProbeR1.expectMessage(EventAndContext("Event", ReplicaId("R1"), recoveryRunning = false, false)) replyProbe.expectMessage(Done) val recoveryProbe = createTestProbe[EventAndContext]() spawn(testBehavior(entityId, "R1", recoveryProbe.ref)) - recoveryProbe.expectMessage(EventAndContext("Event", "R1", recoveryRunning = true, false)) + recoveryProbe.expectMessage(EventAndContext("Event", ReplicaId("R1"), recoveryRunning = true, false)) } "persist all" in { @@ -197,10 +205,10 @@ class ActiveActiveSpec // events at r2 happened concurrently with events at r1 - eventProbeR1.expectMessage(EventAndContext("1 from r1", "R1", false, concurrent = false)) - eventProbeR1.expectMessage(EventAndContext("2 from r1", "R1", false, concurrent = false)) - eventProbeR1.expectMessage(EventAndContext("1 from r2", "R2", false, concurrent = true)) - eventProbeR1.expectMessage(EventAndContext("2 from r2", "R2", false, concurrent = true)) + eventProbeR1.expectMessage(EventAndContext("1 from r1", ReplicaId("R1"), false, concurrent = false)) + eventProbeR1.expectMessage(EventAndContext("2 from r1", ReplicaId("R1"), false, concurrent = false)) + eventProbeR1.expectMessage(EventAndContext("1 from r2", ReplicaId("R2"), false, concurrent = true)) + eventProbeR1.expectMessage(EventAndContext("2 from r2", ReplicaId("R2"), false, concurrent = true)) eventually { val probe = createTestProbe[State]() @@ -225,12 +233,16 @@ class ActiveActiveSpec r2 ! StoreMe("from r2", probe.ref) // R2 0 R1 0 -> R2 1 R1 0 // each gets its local event - eventProbeR1.expectMessage(EventAndContext("from r1", "R1", recoveryRunning = false, concurrent = false)) - eventProbeR2.expectMessage(EventAndContext("from r2", "R2", recoveryRunning = false, concurrent = false)) + eventProbeR1.expectMessage( + EventAndContext("from r1", ReplicaId("R1"), recoveryRunning = false, concurrent = false)) + eventProbeR2.expectMessage( + EventAndContext("from r2", ReplicaId("R2"), recoveryRunning = false, concurrent = false)) // then the replicated remote events, which will be concurrent - eventProbeR1.expectMessage(EventAndContext("from r2", "R2", recoveryRunning = false, concurrent = true)) - eventProbeR2.expectMessage(EventAndContext("from r1", "R1", recoveryRunning = false, concurrent = true)) + eventProbeR1.expectMessage( + EventAndContext("from r2", ReplicaId("R2"), recoveryRunning = false, concurrent = true)) + eventProbeR2.expectMessage( + EventAndContext("from r1", ReplicaId("R1"), recoveryRunning = false, concurrent = true)) // state is updated eventually { @@ -246,11 +258,11 @@ class ActiveActiveSpec // Neither of these should be concurrent, nothing happening at r2 r1 ! StoreMe("from r1 2", probe.ref) // R1 1 R2 1 - eventProbeR1.expectMessage(EventAndContext("from r1 2", "R1", false, concurrent = false)) - eventProbeR2.expectMessage(EventAndContext("from r1 2", "R1", false, concurrent = false)) + eventProbeR1.expectMessage(EventAndContext("from r1 2", ReplicaId("R1"), false, concurrent = false)) + eventProbeR2.expectMessage(EventAndContext("from r1 2", ReplicaId("R1"), false, concurrent = false)) r1 ! StoreMe("from r1 3", probe.ref) // R2 2 R2 1 - eventProbeR1.expectMessage(EventAndContext("from r1 3", "R1", false, concurrent = false)) - eventProbeR2.expectMessage(EventAndContext("from r1 3", "R1", false, concurrent = false)) + eventProbeR1.expectMessage(EventAndContext("from r1 3", ReplicaId("R1"), false, concurrent = false)) + eventProbeR2.expectMessage(EventAndContext("from r1 3", ReplicaId("R1"), false, concurrent = false)) eventually { val probe = createTestProbe[State]() r2 ! GetState(probe.ref) @@ -259,8 +271,8 @@ class ActiveActiveSpec // not concurrent as the above asserts mean that all events are fully replicated r2 ! StoreMe("from r2 2", probe.ref) - eventProbeR1.expectMessage(EventAndContext("from r2 2", "R2", false, concurrent = false)) - eventProbeR2.expectMessage(EventAndContext("from r2 2", "R2", false, concurrent = false)) + eventProbeR1.expectMessage(EventAndContext("from r2 2", ReplicaId("R2"), false, concurrent = false)) + eventProbeR2.expectMessage(EventAndContext("from r2 2", ReplicaId("R2"), false, concurrent = false)) eventually { val probe = createTestProbe[State]() r1 ! GetState(probe.ref) @@ -286,8 +298,8 @@ class ActiveActiveSpec probe.expectMessage(Done) // r2 - eventProbeR2.expectMessage(EventAndContext("from r1 1", "R1", false, false)) - eventProbeR2.expectMessage(EventAndContext("from r1 2", "R1", false, false)) + eventProbeR2.expectMessage(EventAndContext("from r1 1", ReplicaId("R1"), false, false)) + eventProbeR2.expectMessage(EventAndContext("from r1 2", ReplicaId("R1"), false, false)) r2 ! StoreMe("from r2 1", probe.ref) probe.expectMessage(Done) @@ -297,10 +309,10 @@ class ActiveActiveSpec // r3 should only get the events 1, not R2s stored version of them val eventProbeR3 = createTestProbe[EventAndContext]() spawn(testBehavior(entityId, "R3", eventProbeR3.ref)) - eventProbeR3.expectMessage(EventAndContext("from r1 1", "R1", false, false)) - eventProbeR3.expectMessage(EventAndContext("from r1 2", "R1", false, false)) - eventProbeR3.expectMessage(EventAndContext("from r2 1", "R2", false, false)) - eventProbeR3.expectMessage(EventAndContext("from r2 2", "R2", false, false)) + eventProbeR3.expectMessage(EventAndContext("from r1 1", ReplicaId("R1"), false, false)) + eventProbeR3.expectMessage(EventAndContext("from r1 2", ReplicaId("R1"), false, false)) + eventProbeR3.expectMessage(EventAndContext("from r2 1", ReplicaId("R2"), false, false)) + eventProbeR3.expectMessage(EventAndContext("from r2 2", ReplicaId("R2"), false, false)) eventProbeR3.expectNoMessage() } @@ -313,14 +325,18 @@ class ActiveActiveSpec r1 ! StoreMe("from r1", probe.ref) // R1 0 R2 0 -> R1 1 R2 0 r2 ! StoreMe("from r2", probe.ref) // R2 0 R1 0 -> R2 1 R1 0 // local event isn't concurrent, remote event is - eventProbeR1.expectMessage(EventAndContext("from r1", "R1", recoveryRunning = false, concurrent = false)) - eventProbeR1.expectMessage(EventAndContext("from r2", "R2", recoveryRunning = false, concurrent = true)) + eventProbeR1.expectMessage( + EventAndContext("from r1", ReplicaId("R1"), recoveryRunning = false, concurrent = false)) + eventProbeR1.expectMessage( + EventAndContext("from r2", ReplicaId("R2"), recoveryRunning = false, concurrent = true)) // take 2 val eventProbeR1Take2 = createTestProbe[EventAndContext]() spawn(testBehavior(entityId, "R1", eventProbeR1Take2.ref)) - eventProbeR1Take2.expectMessage(EventAndContext("from r1", "R1", recoveryRunning = true, concurrent = false)) - eventProbeR1Take2.expectMessage(EventAndContext("from r2", "R2", recoveryRunning = true, concurrent = true)) + eventProbeR1Take2.expectMessage( + EventAndContext("from r1", ReplicaId("R1"), recoveryRunning = true, concurrent = false)) + eventProbeR1Take2.expectMessage( + EventAndContext("from r2", ReplicaId("R2"), recoveryRunning = true, concurrent = true)) } } diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/MultiJournalActiveActiveSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/MultiJournalActiveActiveSpec.scala new file mode 100644 index 0000000000..d4e0803997 --- /dev/null +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/MultiJournalActiveActiveSpec.scala @@ -0,0 +1,111 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.typed + +import java.util.concurrent.atomic.AtomicInteger + +import akka.Done +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.persistence.query.PersistenceQuery +import akka.persistence.query.scaladsl.CurrentEventsByPersistenceIdQuery +import akka.persistence.testkit.PersistenceTestKitPlugin +import akka.persistence.typed.scaladsl.ActiveActiveEventSourcing +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.stream.scaladsl.Sink +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.Eventually +import org.scalatest.wordspec.AnyWordSpecLike + +object MultiJournalActiveActiveSpec { + + object Actor { + sealed trait Command + case class GetState(replyTo: ActorRef[Set[String]]) extends Command + case class StoreMe(text: String, ack: ActorRef[Done]) extends Command + + private val writeJournalPerReplica = Map("R1" -> "journal1.journal", "R2" -> "journal2.journal") + def apply(entityId: String, replicaId: String): Behavior[Command] = { + ActiveActiveEventSourcing( + entityId, + ReplicaId(replicaId), + Map(ReplicaId("R1") -> "journal1.query", ReplicaId("R2") -> "journal2.query"))( + aaContext => + EventSourcedBehavior[Command, String, Set[String]]( + aaContext.persistenceId, + Set.empty[String], + (state, command) => + command match { + case GetState(replyTo) => + replyTo ! state + Effect.none + case StoreMe(evt, ack) => + Effect.persist(evt).thenRun(_ => ack ! Done) + }, + (state, event) => state + event)).withJournalPluginId(writeJournalPerReplica(replicaId)) + } + } + + def separateJournalsConfig: Config = ConfigFactory.parseString(s""" + journal1 { + journal.class = "${classOf[PersistenceTestKitPlugin].getName}" + query = $${akka.persistence.testkit.query} + } + journal2 { + journal.class = "${classOf[PersistenceTestKitPlugin].getName}" + query = $${akka.persistence.testkit.query} + } + """).withFallback(ConfigFactory.load()).resolve() + +} + +class MultiJournalActiveActiveSpec + extends ScalaTestWithActorTestKit(MultiJournalActiveActiveSpec.separateJournalsConfig) + with AnyWordSpecLike + with LogCapturing + with Eventually { + import MultiJournalActiveActiveSpec._ + val ids = new AtomicInteger(0) + def nextEntityId = s"e-${ids.getAndIncrement()}" + "ActiveActiveEventSourcing" should { + "support one journal per replica" in { + + val r1 = spawn(Actor("id1", "R1")) + val r2 = spawn(Actor("id1", "R2")) + + val probe = createTestProbe[Any]() + r1 ! Actor.StoreMe("r1 m1", probe.ref) + probe.expectMessage(Done) + + r2 ! Actor.StoreMe("r2 m1", probe.ref) + probe.expectMessage(Done) + + eventually { + val probe = createTestProbe[Set[String]]() + r1 ! Actor.GetState(probe.ref) + probe.receiveMessage() should ===(Set("r1 m1", "r2 m1")) + + r2 ! Actor.GetState(probe.ref) + probe.receiveMessage() should ===(Set("r1 m1", "r2 m1")) + } + + val readJournal1 = PersistenceQuery(system).readJournalFor[CurrentEventsByPersistenceIdQuery]("journal1.query") + val readJournal2 = PersistenceQuery(system).readJournalFor[CurrentEventsByPersistenceIdQuery]("journal2.query") + + val eventsForJournal1 = + readJournal1.currentEventsByPersistenceId("id1|R1", 0L, Long.MaxValue).runWith(Sink.seq).futureValue + eventsForJournal1.map(_.event).toSet should ===(Set("r1 m1", "r2 m1")) + + val eventsForJournal2 = + readJournal2.currentEventsByPersistenceId("id1|R2", 0L, Long.MaxValue).runWith(Sink.seq).futureValue + eventsForJournal2.map(_.event).toSet should ===(Set("r1 m1", "r2 m1")) + + } + } +} diff --git a/akka-persistence-typed-tests/src/test/scala/docs.akka.persistence.typed/AAAuctionExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs.akka.persistence.typed/AAAuctionExampleSpec.scala index 4577431cb6..65dcbf9a14 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs.akka.persistence.typed/AAAuctionExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs.akka.persistence.typed/AAAuctionExampleSpec.scala @@ -11,6 +11,7 @@ import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, _ } import akka.actor.typed.{ ActorRef, Behavior } import akka.persistence.testkit.PersistenceTestKitPlugin import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal +import akka.persistence.typed.ReplicaId import akka.persistence.typed.scaladsl.{ ActiveActiveContext, ActiveActiveEventSourcing, Effect, EventSourcedBehavior } import akka.serialization.jackson.CborSerializable import org.scalatest.concurrent.{ Eventually, ScalaFutures } @@ -21,7 +22,7 @@ object AAAuctionExampleSpec { type MoneyAmount = Int - case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originDc: String) + case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originDc: ReplicaId) // commands sealed trait AuctionCommand @@ -33,12 +34,13 @@ object AAAuctionExampleSpec { sealed trait AuctionEvent extends CborSerializable final case class BidRegistered(bid: Bid) extends AuctionEvent - final case class AuctionFinished(atDc: String) extends AuctionEvent - final case class WinnerDecided(atDc: String, winningBid: Bid, highestCounterOffer: MoneyAmount) extends AuctionEvent + final case class AuctionFinished(atDc: ReplicaId) extends AuctionEvent + final case class WinnerDecided(atDc: ReplicaId, winningBid: Bid, highestCounterOffer: MoneyAmount) + extends AuctionEvent sealed trait AuctionPhase case object Running extends AuctionPhase - final case class Closing(finishedAtDc: Set[String]) extends AuctionPhase + final case class Closing(finishedAtDc: Set[ReplicaId]) extends AuctionPhase case object Closed extends AuctionPhase case class AuctionState( @@ -85,8 +87,8 @@ object AAAuctionExampleSpec { // If timestamps are equal, choose by dc where the offer was submitted // In real auctions, this last comparison should be deterministic but unpredictable, so that submitting to a // particular DC would not be an advantage. - (first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.compareTo( - second.originDc) < 0) + (first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.id + .compareTo(second.originDc.id) < 0) } case class AuctionSetup( @@ -94,7 +96,7 @@ object AAAuctionExampleSpec { initialBid: Bid, // the initial bid is basically the minimum price bidden at start time by the owner closingAt: Instant, responsibleForClosing: Boolean, - allDcs: Set[String]) + allDcs: Set[ReplicaId]) def commandHandler(setup: AuctionSetup, ctx: ActorContext[AuctionCommand], aaContext: ActiveActiveContext)( state: AuctionState, @@ -199,15 +201,16 @@ object AAAuctionExampleSpec { def initialState(setup: AuctionSetup) = AuctionState(phase = Running, highestBid = setup.initialBid, highestCounterOffer = setup.initialBid.offer) - def behavior(replica: String, setup: AuctionSetup): Behavior[AuctionCommand] = Behaviors.setup[AuctionCommand] { + def behavior(replica: ReplicaId, setup: AuctionSetup): Behavior[AuctionCommand] = Behaviors.setup[AuctionCommand] { ctx => - ActiveActiveEventSourcing(setup.name, replica, setup.allDcs, PersistenceTestKitReadJournal.Identifier) { aaCtx => - EventSourcedBehavior( - aaCtx.persistenceId, - initialState(setup), - commandHandler(setup, ctx, aaCtx), - eventHandler(ctx, aaCtx, setup)) - } + ActiveActiveEventSourcing + .withSharedJournal(setup.name, replica, setup.allDcs, PersistenceTestKitReadJournal.Identifier) { aaCtx => + EventSourcedBehavior( + aaCtx.persistenceId, + initialState(setup), + commandHandler(setup, ctx, aaCtx), + eventHandler(ctx, aaCtx, setup)) + } } } @@ -223,19 +226,19 @@ class AAAuctionExampleSpec "Auction example" should { "work" in { - val Replicas = Set("DC-A", "DC-B") + val Replicas = Set(ReplicaId("DC-A"), ReplicaId("DC-B")) val setupA = AuctionSetup( "old-skis", - Bid("chbatey", 12, Instant.now(), "DC-A"), + Bid("chbatey", 12, Instant.now(), ReplicaId("DC-A")), Instant.now().plusSeconds(10), responsibleForClosing = true, Replicas) val setupB = setupA.copy(responsibleForClosing = false) - val dcAReplica: ActorRef[AuctionCommand] = spawn(behavior("DC-A", setupA)) - val dcBReplica: ActorRef[AuctionCommand] = spawn(behavior("DC-B", setupB)) + val dcAReplica: ActorRef[AuctionCommand] = spawn(behavior(ReplicaId("DC-A"), setupA)) + val dcBReplica: ActorRef[AuctionCommand] = spawn(behavior(ReplicaId("DC-B"), setupB)) dcAReplica ! OfferBid("me", 100) dcAReplica ! OfferBid("me", 99) diff --git a/akka-persistence-typed-tests/src/test/scala/docs.akka.persistence.typed/AABlogExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs.akka.persistence.typed/AABlogExampleSpec.scala index 99106b5c71..6c297f4ea1 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs.akka.persistence.typed/AABlogExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs.akka.persistence.typed/AABlogExampleSpec.scala @@ -10,6 +10,7 @@ import akka.actor.typed.ActorRef import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } import akka.persistence.testkit.PersistenceTestKitPlugin import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal +import akka.persistence.typed.ReplicaId import akka.persistence.typed.scaladsl._ import akka.serialization.jackson.CborSerializable import org.scalatest.concurrent.{ Eventually, ScalaFutures } @@ -24,7 +25,7 @@ object AABlogExampleSpec { copy(content = Some(newContent), contentTimestamp = timestamp) def isEmpty: Boolean = content.isEmpty } - val emptyState: BlogState = BlogState(None, LwwTime(Long.MinValue, ""), published = false) + val emptyState: BlogState = BlogState(None, LwwTime(Long.MinValue, ReplicaId("")), published = false) final case class PostContent(title: String, body: String) final case class PostSummary(postId: String, title: String) @@ -110,20 +111,30 @@ class AABlogExampleSpec "Blog Example" should { "work" in { val refDcA: ActorRef[BlogCommand] = - spawn(Behaviors.setup[BlogCommand] { ctx => - ActiveActiveEventSourcing("cat", "DC-A", Set("DC-A", "DC-B"), PersistenceTestKitReadJournal.Identifier) { - (aa: ActiveActiveContext) => + spawn( + Behaviors.setup[BlogCommand] { ctx => + ActiveActiveEventSourcing.withSharedJournal( + "cat", + ReplicaId("DC-A"), + Set(ReplicaId("DC-A"), ReplicaId("DC-B")), + PersistenceTestKitReadJournal.Identifier) { (aa: ActiveActiveContext) => behavior(aa, ctx) - } - }, "dc-a") + } + }, + "dc-a") val refDcB: ActorRef[BlogCommand] = - spawn(Behaviors.setup[BlogCommand] { ctx => - ActiveActiveEventSourcing("cat", "DC-B", Set("DC-A", "DC-B"), PersistenceTestKitReadJournal.Identifier) { - (aa: ActiveActiveContext) => + spawn( + Behaviors.setup[BlogCommand] { ctx => + ActiveActiveEventSourcing.withSharedJournal( + "cat", + ReplicaId("DC-B"), + Set(ReplicaId("DC-A"), ReplicaId("DC-B")), + PersistenceTestKitReadJournal.Identifier) { (aa: ActiveActiveContext) => behavior(aa, ctx) - } - }, "dc-b") + } + }, + "dc-b") import akka.actor.typed.scaladsl.AskPattern._ import akka.util.Timeout diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/PersistenceId.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/PersistenceId.scala index 7614edec35..6018759038 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/PersistenceId.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/PersistenceId.scala @@ -130,16 +130,16 @@ object PersistenceId { * Constructs a persistence id from a unique entity id that includes the replica id. */ @InternalApi - private[akka] def replicatedUniqueId(entityId: String, replicaId: String): PersistenceId = { + private[akka] def replicatedUniqueId(entityId: String, replicaId: ReplicaId): PersistenceId = { if (entityId.contains(DefaultSeparator)) throw new IllegalArgumentException( s"entityId [$entityId] contains [$DefaultSeparator] which is a reserved character") - if (replicaId.contains(DefaultSeparator)) + if (replicaId.id.contains(DefaultSeparator)) throw new IllegalArgumentException( - s"replicaId [$replicaId] contains [$DefaultSeparator] which is a reserved character") + s"replicaId [${replicaId.id}] contains [$DefaultSeparator] which is a reserved character") - new PersistenceId(entityId + DefaultSeparator + replicaId) + new PersistenceId(entityId + DefaultSeparator + replicaId.id) } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/ReplicaId.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/ReplicaId.scala new file mode 100644 index 0000000000..b01082a980 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/ReplicaId.scala @@ -0,0 +1,10 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.typed + +/** + * Identifies a replica in Active Active eventsourcing, could be a datacenter name or a logical identifier. + */ +final case class ReplicaId(id: String) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala index 20281bace5..4528b602c4 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala @@ -12,6 +12,7 @@ import akka.actor.typed.Signal import akka.actor.typed.scaladsl.ActorContext import akka.annotation.InternalApi import akka.persistence._ +import akka.persistence.typed.ReplicaId import akka.persistence.typed.scaladsl.EventSourcedBehavior.ActiveActive import akka.persistence.typed.{ EventAdapter, PersistenceId, SnapshotAdapter } import akka.persistence.typed.scaladsl.{ EventSourcedBehavior, RetentionCriteria } @@ -61,7 +62,7 @@ private[akka] final class BehaviorSetup[C, E, S]( val journal: ClassicActorRef = persistence.journalFor(settings.journalPluginId) val snapshotStore: ClassicActorRef = persistence.snapshotStoreFor(settings.snapshotPluginId) - val replicaId: Option[String] = activeActive.map(_.replicaId) + val replicaId: Option[ReplicaId] = activeActive.map(_.replicaId) def selfClassic: ClassicActorRef = context.self.toClassic diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index 5c46366211..95e474860e 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -35,6 +35,7 @@ import akka.persistence.typed.EventAdapter import akka.persistence.typed.NoOpEventAdapter import akka.persistence.typed.PersistenceId import akka.persistence.typed.PublishedEvent +import akka.persistence.typed.ReplicaId import akka.persistence.typed.SnapshotAdapter import akka.persistence.typed.SnapshotCompleted import akka.persistence.typed.SnapshotFailed @@ -252,10 +253,9 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( override private[akka] def withActiveActive( context: ActiveActiveContextImpl, - id: String, - allIds: Set[String], - queryPluginId: String): EventSourcedBehavior[Command, Event, State] = { - copy(activeActive = Some(ActiveActive(id, allIds, context, queryPluginId))) + replicaId: ReplicaId, + allReplicaIdsAndQueryPlugins: Map[ReplicaId, String]): EventSourcedBehavior[Command, Event, State] = { + copy(activeActive = Some(ActiveActive(replicaId, allReplicaIdsAndQueryPlugins, context))) } } @@ -282,7 +282,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( */ @InternalApi private[akka] final case class ReplicatedEventMetaData( - originReplica: String, + originReplica: ReplicaId, originSequenceNr: Long, version: VersionVector, concurrent: Boolean) // whether when the event handler was executed the event was concurrent @@ -296,13 +296,13 @@ private[akka] final case class ReplicatedEventMetaData( @InternalApi private[akka] final case class ReplicatedEvent[E]( event: E, - originReplica: String, + originReplica: ReplicaId, originSequenceNr: Long, originVersion: VersionVector) @InternalApi private[akka] case object ReplicatedEventAck -final class ReplicatedPublishedEventMetaData(val replicaId: String, private[akka] val version: VersionVector) +final class ReplicatedPublishedEventMetaData(val replicaId: ReplicaId, private[akka] val version: VersionVector) /** * INTERNAL API diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index d2612bce34..96b90d565f 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -18,6 +18,7 @@ import akka.persistence.typed.EmptyEventSeq import akka.persistence.typed.EventsSeq import akka.persistence.typed.RecoveryCompleted import akka.persistence.typed.RecoveryFailed +import akka.persistence.typed.ReplicaId import akka.persistence.typed.SingleEventSeq import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetState import akka.persistence.typed.internal.ReplayingEvents.ReplayingState @@ -52,7 +53,7 @@ private[akka] object ReplayingEvents { receivedPoisonPill: Boolean, recoveryStartTime: Long, version: VersionVector, - seenSeqNrPerReplica: Map[String, Long]) + seenSeqNrPerReplica: Map[ReplicaId, Long]) def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: ReplayingState[S]): Behavior[InternalProtocol] = Behaviors.setup { _ => @@ -121,7 +122,7 @@ private[akka] final class ReplayingEvents[C, E, S]( eventForErrorReporting = OptionVal.Some(event) state = state.copy(seqNr = repr.sequenceNr) - val aaMetaAndSelfReplica: Option[(ReplicatedEventMetaData, String)] = + val aaMetaAndSelfReplica: Option[(ReplicatedEventMetaData, ReplicaId)] = setup.activeActive match { case Some(aa) => val meta = repr.metadata match { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 7c6af4a82a..c0d83579c5 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -34,6 +34,7 @@ import akka.persistence.SnapshotProtocol import akka.persistence.journal.Tagged import akka.persistence.query.{ EventEnvelope, PersistenceQuery } import akka.persistence.query.scaladsl.EventsByPersistenceIdQuery +import akka.persistence.typed.ReplicaId import akka.persistence.typed.{ DeleteEventsCompleted, DeleteEventsFailed, @@ -91,8 +92,8 @@ private[akka] object Running { state: State, receivedPoisonPill: Boolean, version: VersionVector, - seenPerReplica: Map[String, Long], - replicationControl: Map[String, ReplicationStreamControl]) { + seenPerReplica: Map[ReplicaId, Long], + replicationControl: Map[ReplicaId, ReplicationStreamControl]) { def nextSequenceNr(): RunningState[State] = copy(seqNr = seqNr + 1) @@ -128,8 +129,8 @@ private[akka] object Running { if (replicaId != aa.replicaId) { val seqNr = state.seenPerReplica(replicaId) val pid = PersistenceId.replicatedUniqueId(aa.aaContext.entityId, replicaId) - // FIXME support different configuration per replica https://github.com/akka/akka/issues/29257 - val replication = query.readJournalFor[EventsByPersistenceIdQuery](aa.queryPluginId) + val queryPluginId = aa.allReplicasAndQueryPlugins(replicaId) + val replication = query.readJournalFor[EventsByPersistenceIdQuery](queryPluginId) implicit val timeout = Timeout(30.seconds) @@ -424,7 +425,7 @@ private[akka] object Running { val newState2 = setup.activeActive match { case Some(aa) => - val updatedVersion = newState.version.updated(aa.replicaId, _currentSequenceNumber) + val updatedVersion = newState.version.updated(aa.replicaId.id, _currentSequenceNumber) val r = internalPersist( setup.context, cmd, @@ -475,7 +476,7 @@ private[akka] object Running { val adaptedEvent = adaptEvent(event) val eventMetadata = metadataTemplate match { case Some(template) => - val updatedVersion = currentState.version.updated(template.originReplica, _currentSequenceNumber) + val updatedVersion = currentState.version.updated(template.originReplica.id, _currentSequenceNumber) setup.log.trace("Processing event [{}] with version vector [{}]", event, updatedVersion) currentState = currentState.copy(version = updatedVersion) Some(template.copy(originSequenceNr = _currentSequenceNumber, version = updatedVersion)) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala index 888c7e3487..77340336f5 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala @@ -5,20 +5,21 @@ package akka.persistence.typed.scaladsl import akka.persistence.typed.PersistenceId +import akka.persistence.typed.ReplicaId import akka.util.WallClock /** * Utility class for comparing timestamp and data center * identifier when implementing last-writer wins. */ -final case class LwwTime(timestamp: Long, originDc: String) { +final case class LwwTime(timestamp: Long, originDc: ReplicaId) { /** * Create a new `LwwTime` that has a `timestamp` that is * `max` of the given timestamp and previous timestamp + 1, * i.e. monotonically increasing. */ - def increase(t: Long, replicaId: String): LwwTime = + def increase(t: Long, replicaId: ReplicaId): LwwTime = LwwTime(math.max(timestamp + 1, t), replicaId) /** @@ -30,7 +31,7 @@ final case class LwwTime(timestamp: Long, originDc: String) { def isAfter(other: LwwTime): Boolean = { if (timestamp > other.timestamp) true else if (timestamp < other.timestamp) false - else if (other.originDc.compareTo(originDc) > 0) true + else if (other.originDc.id.compareTo(originDc.id) > 0) true else false } } @@ -38,10 +39,10 @@ final case class LwwTime(timestamp: Long, originDc: String) { // FIXME docs trait ActiveActiveContext { - def origin: String + def origin: ReplicaId def concurrent: Boolean - def replicaId: String - def allReplicas: Set[String] + def replicaId: ReplicaId + def allReplicas: Set[ReplicaId] def persistenceId: PersistenceId def recoveryRunning: Boolean def entityId: String @@ -52,9 +53,13 @@ trait ActiveActiveContext { // FIXME, parts of this can be set during initialisation // Other fields will be set before executing the event handler as they change per event // https://github.com/akka/akka/issues/29258 -private[akka] class ActiveActiveContextImpl(val entityId: String, val replicaId: String, val allReplicas: Set[String]) +private[akka] class ActiveActiveContextImpl( + val entityId: String, + val replicaId: ReplicaId, + val replicasAndQueryPlugins: Map[ReplicaId, String]) extends ActiveActiveContext { - var _origin: String = null + val allReplicas: Set[ReplicaId] = replicasAndQueryPlugins.keySet + var _origin: ReplicaId = null var _recoveryRunning: Boolean = false var _concurrent: Boolean = false @@ -64,7 +69,7 @@ private[akka] class ActiveActiveContextImpl(val entityId: String, val replicaId: * The origin of the current event. * Undefined result if called from anywhere other than an event handler. */ - override def origin: String = _origin + override def origin: ReplicaId = _origin /** * Whether the happened concurrently with an event from another replica. @@ -83,7 +88,7 @@ private[akka] class ActiveActiveContextImpl(val entityId: String, val replicaId: object ActiveActiveEventSourcing { /** - * Initialize a replicated event sourced behavior. + * Initialize a replicated event sourced behavior where all entity replicas are stored in the same journal. * * Events from each replica for the same entityId will be replicated to every copy. * Care must be taken to handle events in any order as events can happen concurrently at different replicas. @@ -93,21 +98,43 @@ object ActiveActiveEventSourcing { * A different journal plugin id can be configured using withJournalPluginId after creation. Different databases * can be used for each replica. * The events from other replicas are read using PersistentQuery. - * TODO support a different query plugin id per replicas: https://github.com/akka/akka/issues/29257 * * @param replicaId The unique identity for this entity. The underlying persistence id will include the replica. * @param allReplicaIds All replica ids. These need to be known to receive events from all replicas. - * @param queryPluginId Used to read the events from other replicas. Must be the query side of your configured journal plugin. - * @return + * @param queryPluginId A single query plugin used to read the events from other replicas. Must be the query side of your configured journal plugin. + */ + def withSharedJournal[Command, Event, State]( + entityId: String, + replicaId: ReplicaId, + allReplicaIds: Set[ReplicaId], + queryPluginId: String)(activeActiveContext: ActiveActiveContext => EventSourcedBehavior[Command, Event, State]) + : EventSourcedBehavior[Command, Event, State] = + apply(entityId, replicaId, allReplicaIds.map(id => id -> queryPluginId).toMap)(activeActiveContext) + + /** + * Initialize a replicated event sourced behavior. + * + * Events from each replica for the same entityId will be replicated to every copy. + * Care must be taken to handle events in any order as events can happen concurrently at different replicas. + * + * Using an replicated event sourced behavior means there is no longer the single writer guarantee. + * + * The journal plugin id for the entity itself can be configured using withJournalPluginId after creation. + * A query side identifier is passed per replica allowing for separate database/journal configuration per + * replica. The events from other replicas are read using PersistentQuery. + * + * @param replicaId The unique identity for this entity. The underlying persistence id will include the replica. + * @param allReplicasAndQueryPlugins All replica ids and a query plugin per replica id. These need to be known to receive events from all replicas + * and configured with the query plugin for the journal that each replica uses. */ def apply[Command, Event, State]( entityId: String, - replicaId: String, - allReplicaIds: Set[String], - queryPluginId: String)(activeActiveContext: ActiveActiveContext => EventSourcedBehavior[Command, Event, State]) + replicaId: ReplicaId, + allReplicasAndQueryPlugins: Map[ReplicaId, String])( + activeActiveContext: ActiveActiveContext => EventSourcedBehavior[Command, Event, State]) : EventSourcedBehavior[Command, Event, State] = { - val context = new ActiveActiveContextImpl(entityId, replicaId, allReplicaIds) - activeActiveContext(context).withActiveActive(context, replicaId, allReplicaIds, queryPluginId) + val context = new ActiveActiveContextImpl(entityId, replicaId, allReplicasAndQueryPlugins) + activeActiveContext(context).withActiveActive(context, replicaId, allReplicasAndQueryPlugins) } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala index c7357ac198..56dcf975cf 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala @@ -16,6 +16,7 @@ import akka.annotation.ApiMayChange import akka.annotation.{ DoNotInherit, InternalApi } import akka.persistence.typed.EventAdapter import akka.persistence.typed.PersistenceId +import akka.persistence.typed.ReplicaId import akka.persistence.typed.SnapshotAdapter import akka.persistence.typed.SnapshotSelectionCriteria import akka.persistence.typed.internal._ @@ -25,15 +26,16 @@ object EventSourcedBehavior { // FIXME move to internal @InternalApi private[akka] final case class ActiveActive( - replicaId: String, - allReplicas: Set[String], - aaContext: ActiveActiveContextImpl, - queryPluginId: String) { + replicaId: ReplicaId, + allReplicasAndQueryPlugins: Map[ReplicaId, String], + aaContext: ActiveActiveContextImpl) { + + val allReplicas: Set[ReplicaId] = allReplicasAndQueryPlugins.keySet /** * Must only be called on the same thread that will execute the user code */ - def setContext(recoveryRunning: Boolean, originReplica: String, concurrent: Boolean): Unit = { + def setContext(recoveryRunning: Boolean, originReplica: ReplicaId, concurrent: Boolean): Unit = { aaContext._recoveryRunning = recoveryRunning aaContext._concurrent = concurrent aaContext._origin = originReplica @@ -166,9 +168,8 @@ object EventSourcedBehavior { private[akka] def withActiveActive( context: ActiveActiveContextImpl, - replicaId: String, - allReplicaIds: Set[String], - queryPluginId: String): EventSourcedBehavior[Command, Event, State] + replicaId: ReplicaId, + allReplicasAndQueryPlugins: Map[ReplicaId, String]): EventSourcedBehavior[Command, Event, State] /** * Change the snapshot store plugin id that this actor should use.