Active active per replica journal selection (#29350)

* Support for having multiple isolated testkit journals and corresponding read journals in persistence testkit
* Read journal selection per active-active replica 
* a dedicated ReplicaId type to avoid stringly typed mismatches
This commit is contained in:
Johan Andrén 2020-07-08 11:20:45 +02:00 committed by Christopher Batey
parent 398ab2efe0
commit 36a8b6f24a
25 changed files with 532 additions and 168 deletions

View file

@ -13,6 +13,7 @@ import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.persistence.typed.PublishedEvent
import akka.persistence.typed.ReplicaId
import scala.collection.JavaConverters._
@ -53,7 +54,9 @@ object ActiveActiveShardingDirectReplication {
* @param selfReplica The replica id of the replica that runs on this node
* @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system
*/
def create[T](selfReplica: String, replicaShardingProxies: java.util.Map[String, ActorRef[T]]): Behavior[Command] =
def create[T](
selfReplica: ReplicaId,
replicaShardingProxies: java.util.Map[ReplicaId, ActorRef[T]]): Behavior[Command] =
apply(selfReplica, replicaShardingProxies.asScala.toMap)
/**
@ -61,7 +64,7 @@ object ActiveActiveShardingDirectReplication {
* @param selfReplica The replica id of the replica that runs on this node
* @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system
*/
def apply[T](selfReplica: String, replicaShardingProxies: Map[String, ActorRef[T]]): Behavior[Command] =
def apply[T](selfReplica: ReplicaId, replicaShardingProxies: Map[ReplicaId, ActorRef[T]]): Behavior[Command] =
Behaviors.setup[Command] { context =>
context.log.debug(
"Subscribing to event stream to forward events to [{}] sharded replicas",

View file

@ -9,9 +9,11 @@ import akka.Done
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.eventstream.EventStream
import akka.persistence.typed
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.PublishedEvent
import akka.persistence.typed.internal.{ PublishedEventImpl, ReplicatedPublishedEventMetaData, VersionVector }
import akka.persistence.typed.ReplicaId
class ActiveActiveShardingDirectReplicationSpec
extends ScalaTestWithActorTestKit
@ -27,20 +29,22 @@ class ActiveActiveShardingDirectReplicationSpec
val replicationActor = spawn(
ActiveActiveShardingDirectReplication(
"ReplicaA",
replicaShardingProxies =
Map("ReplicaA" -> replicaAProbe.ref, "ReplicaB" -> replicaBProbe.ref, "ReplicaC" -> replicaCProbe.ref)))
typed.ReplicaId("ReplicaA"),
replicaShardingProxies = Map(
ReplicaId("ReplicaA") -> replicaAProbe.ref,
ReplicaId("ReplicaB") -> replicaBProbe.ref,
ReplicaId("ReplicaC") -> replicaCProbe.ref)))
val upProbe = createTestProbe[Done]()
replicationActor ! ActiveActiveShardingDirectReplication.VerifyStarted(upProbe.ref)
upProbe.receiveMessage() // not bullet proof wrt to subscription being complete but good enough
val event = PublishedEventImpl(
PersistenceId.replicatedUniqueId("pid", "ReplicaA"),
PersistenceId.replicatedUniqueId("pid", ReplicaId("ReplicaA")),
1L,
"event",
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData("ReplicaA", VersionVector.empty)))
Some(new ReplicatedPublishedEventMetaData(ReplicaId("ReplicaA"), VersionVector.empty)))
system.eventStream ! EventStream.Publish(event)
replicaBProbe.receiveMessage().message should equal(event)

View file

@ -4,6 +4,8 @@
package akka.persistence.testkit
import akka.actor.ActorLogging
import scala.collection.immutable
import scala.concurrent.Future
import scala.util.Try
@ -13,6 +15,7 @@ import akka.persistence._
import akka.persistence.journal.{ AsyncWriteJournal, Tagged }
import akka.persistence.snapshot.SnapshotStore
import akka.persistence.testkit.internal.{ InMemStorageExtension, SnapshotStorageEmulatorExtension }
import akka.util.unused
/**
* INTERNAL API
@ -20,9 +23,12 @@ import akka.persistence.testkit.internal.{ InMemStorageExtension, SnapshotStorag
* Persistence testkit plugin for events.
*/
@InternalApi
class PersistenceTestKitPlugin extends AsyncWriteJournal {
class PersistenceTestKitPlugin(@unused cfg: Config, cfgPath: String) extends AsyncWriteJournal with ActorLogging {
private final val storage = InMemStorageExtension(context.system)
private final val storage = {
log.debug("Using in memory storage [{}] for test kit journal", cfgPath)
InMemStorageExtension(context.system).storageFor(cfgPath)
}
private val eventStream = context.system.eventStream
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
@ -60,7 +66,7 @@ class PersistenceTestKitPlugin extends AsyncWriteJournal {
object PersistenceTestKitPlugin {
val PluginId = "akka.persistence.testkit.journal.pluginid"
val PluginId = "akka.persistence.testkit.journal"
import akka.util.ccompat.JavaConverters._

View file

@ -6,6 +6,8 @@ package akka.persistence.testkit
import akka.annotation.{ ApiMayChange, InternalApi }
import scala.util.control.NoStackTrace
/**
* Policies allow to emulate behavior of the storage (failures and rejections).
*
@ -150,7 +152,7 @@ object ExpectedRejection extends ExpectedRejection {
}
sealed abstract class ExpectedFailure extends Throwable
sealed abstract class ExpectedFailure extends Throwable with NoStackTrace
object ExpectedFailure extends ExpectedFailure {

View file

@ -4,8 +4,9 @@
package akka.persistence.testkit
import scala.util.Success
import akka.actor.Extension
import scala.util.Success
import akka.annotation.InternalApi
import akka.persistence.{ SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria }
import akka.persistence.testkit.ProcessingPolicy.DefaultPolicies
@ -15,7 +16,9 @@ import akka.persistence.testkit.internal.TestKitStorage
* INTERNAL API
*/
@InternalApi
private[testkit] trait SnapshotStorage extends TestKitStorage[SnapshotOperation, (SnapshotMetadata, Any)] {
private[testkit] trait SnapshotStorage
extends TestKitStorage[SnapshotOperation, (SnapshotMetadata, Any)]
with Extension {
import SnapshotStorage._

View file

@ -4,26 +4,55 @@
package akka.persistence.testkit.internal
import java.util.concurrent.ConcurrentHashMap
import akka.actor.Extension
import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider }
import akka.annotation.InternalApi
import akka.persistence.testkit.EventStorage
import akka.persistence.testkit.JournalOperation
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.ProcessingPolicy
import akka.persistence.testkit.scaladsl.PersistenceTestKit
/**
* INTERNAL API
*/
@InternalApi
private[testkit] object InMemStorageExtension extends ExtensionId[EventStorage] with ExtensionIdProvider {
private[testkit] object InMemStorageExtension extends ExtensionId[InMemStorageExtension] with ExtensionIdProvider {
override def get(system: ActorSystem): EventStorage = super.get(system)
override def get(system: ActorSystem): InMemStorageExtension = super.get(system)
override def createExtension(system: ExtendedActorSystem) =
if (PersistenceTestKit.Settings(system).serialize) {
new SerializedEventStorageImpl(system)
} else {
new SimpleEventStorageImpl
}
override def createExtension(system: ExtendedActorSystem): InMemStorageExtension =
new InMemStorageExtension(system)
override def lookup = InMemStorageExtension
}
/**
* INTERNAL API
*/
@InternalApi
final class InMemStorageExtension(system: ExtendedActorSystem) extends Extension {
private val stores = new ConcurrentHashMap[String, EventStorage]()
def defaultStorage(): EventStorage = storageFor(PersistenceTestKitPlugin.PluginId)
// shortcuts for default policy
def currentPolicy: ProcessingPolicy[JournalOperation] = defaultStorage().currentPolicy
def setPolicy(policy: ProcessingPolicy[JournalOperation]): Unit = defaultStorage().setPolicy(policy)
def resetPolicy(): Unit = defaultStorage().resetPolicy()
def storageFor(key: String): EventStorage =
stores.computeIfAbsent(key, _ => {
// we don't really care about the key here, we just want separate instances
if (PersistenceTestKit.Settings(system).serialize) {
new SerializedEventStorageImpl(system)
} else {
new SimpleEventStorageImpl
}
})
}

View file

@ -4,7 +4,8 @@
package akka.persistence.testkit.internal
import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import akka.actor.Extension
import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider }
import akka.annotation.InternalApi
import akka.persistence.testkit.SnapshotStorage
import akka.persistence.testkit.scaladsl.SnapshotTestKit

View file

@ -9,7 +9,6 @@ import java.util.concurrent.atomic.AtomicReference
import scala.collection.immutable
import akka.actor.Extension
import akka.annotation.InternalApi
import akka.persistence.testkit.ProcessingPolicy
@ -151,4 +150,4 @@ sealed trait PolicyOps[U] {
* INTERNAL API
*/
@InternalApi
private[testkit] trait TestKitStorage[P, R] extends InMemStorage[String, R] with PolicyOps[P] with Extension
private[testkit] trait TestKitStorage[P, R] extends InMemStorage[String, R] with PolicyOps[P]

View file

@ -5,11 +5,13 @@
package akka.persistence.testkit.query
import akka.actor.ExtendedActorSystem
import akka.persistence.query.ReadJournalProvider
import com.typesafe.config.Config
class PersistenceTestKitReadJournalProvider(system: ExtendedActorSystem) extends ReadJournalProvider {
class PersistenceTestKitReadJournalProvider(system: ExtendedActorSystem, config: Config, configPath: String)
extends ReadJournalProvider {
override def scaladslReadJournal(): scaladsl.PersistenceTestKitReadJournal =
new scaladsl.PersistenceTestKitReadJournal(system)
new scaladsl.PersistenceTestKitReadJournal(system, config, configPath)
override def javadslReadJournal(): javadsl.PersistenceTestKitReadJournal =
new javadsl.PersistenceTestKitReadJournal(scaladslReadJournal())

View file

@ -11,17 +11,27 @@ import akka.persistence.testkit.EventStorage
import akka.persistence.testkit.internal.InMemStorageExtension
import akka.persistence.testkit.query.internal.EventsByPersistenceIdStage
import akka.stream.scaladsl.Source
import akka.util.unused
import com.typesafe.config.Config
import org.slf4j.LoggerFactory
object PersistenceTestKitReadJournal {
val Identifier = "akka.persistence.testkit.query"
}
final class PersistenceTestKitReadJournal(system: ExtendedActorSystem)
final class PersistenceTestKitReadJournal(system: ExtendedActorSystem, @unused config: Config, configPath: String)
extends ReadJournal
with EventsByPersistenceIdQuery
with CurrentEventsByPersistenceIdQuery {
private val storage: EventStorage = InMemStorageExtension(system)
private val log = LoggerFactory.getLogger(getClass)
private val storage: EventStorage = {
// use shared path up to before `query` to identify which inmem journal we are addressing
val storagePluginId = configPath.replaceAll("""query$""", "journal")
log.debug("Using in memory storage [{}] for test kit read journal", storagePluginId)
InMemStorageExtension(system).storageFor(storagePluginId)
}
override def eventsByPersistenceId(
persistenceId: String,

View file

@ -431,7 +431,7 @@ class PersistenceTestKit(system: ActorSystem)
import PersistenceTestKit._
override protected val storage = InMemStorageExtension(system)
override protected val storage = InMemStorageExtension(system).storageFor(PersistenceTestKitPlugin.PluginId)
private final lazy val settings = Settings(system)

View file

@ -0,0 +1,115 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.scaladsl
import akka.Done
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.scaladsl.CurrentEventsByPersistenceIdQuery
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.stream.scaladsl.Sink
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
object MultipleJournalsSpec {
object ListActor {
sealed trait Command
case class Save(text: String, replyTo: ActorRef[Done]) extends Command
case class ShowMeWhatYouGot(replyTo: ActorRef[Set[String]]) extends Command
case object Stop extends Command
def apply(persistenceId: String, journal: String): Behavior[Command] =
EventSourcedBehavior[Command, String, Set[String]](
PersistenceId.ofUniqueId(persistenceId),
Set.empty[String],
(state, cmd) =>
cmd match {
case Save(text, replyTo) =>
Effect.persist(text).thenRun(_ => replyTo ! Done)
case ShowMeWhatYouGot(replyTo) =>
replyTo ! state
Effect.none
case Stop =>
Effect.stop()
},
(state, evt) => state + evt).withJournalPluginId(journal)
}
def config = ConfigFactory.parseString(s"""
journal1 {
# journal and query expected to be next to each other under config path
journal.class = "${classOf[PersistenceTestKitPlugin].getName}"
query = $${akka.persistence.testkit.query}
}
journal2 {
journal.class = "${classOf[PersistenceTestKitPlugin].getName}"
query = $${akka.persistence.testkit.query}
}
""").withFallback(ConfigFactory.load()).resolve()
}
class MultipleJournalsSpec
extends ScalaTestWithActorTestKit(MultipleJournalsSpec.config)
with AnyWordSpecLike
with LogCapturing {
import MultipleJournalsSpec._
"The testkit journal and query plugin" must {
"be possible to configure and use in multiple isolated instances" in {
val probe = createTestProbe[Any]()
{
// one actor in each journal with same id
val j1 = spawn(ListActor("id1", "journal1.journal"))
val j2 = spawn(ListActor("id1", "journal2.journal"))
j1 ! ListActor.Save("j1m1", probe.ref)
probe.receiveMessage()
j2 ! ListActor.Save("j2m1", probe.ref)
probe.receiveMessage()
j1 ! ListActor.Stop
probe.expectTerminated(j1)
j2 ! ListActor.Stop
probe.expectTerminated(j2)
}
{
// new incarnations in each journal with same id
val j1 = spawn(ListActor("id1", "journal1.journal"))
val j2 = spawn(ListActor("id1", "journal2.journal"))
// does not see each others events
j1 ! ListActor.ShowMeWhatYouGot(probe.ref)
probe.expectMessage(Set("j1m1"))
j2 ! ListActor.ShowMeWhatYouGot(probe.ref)
probe.expectMessage(Set("j2m1"))
}
val readJournal1 = PersistenceQuery(system).readJournalFor[CurrentEventsByPersistenceIdQuery]("journal1.query")
val readJournal2 = PersistenceQuery(system).readJournalFor[CurrentEventsByPersistenceIdQuery]("journal2.query")
val eventsForJournal1 =
readJournal1.currentEventsByPersistenceId("id1", 0L, Long.MaxValue).runWith(Sink.seq).futureValue
eventsForJournal1.map(_.event) should ===(Seq("j1m1"))
val eventsForJournal2 =
readJournal2.currentEventsByPersistenceId("id1", 0L, Long.MaxValue).runWith(Sink.seq).futureValue
eventsForJournal2.map(_.event) should ===(Seq("j2m1"))
}
}
}

View file

@ -26,9 +26,13 @@ object ActiveActiveEventPublishingSpec {
case class Get(replyTo: ActorRef[Set[String]]) extends Command
case object Stop extends Command
def apply(entityId: String, replicaId: String, allReplicas: Set[String]): Behavior[Command] =
def apply(entityId: String, replicaId: ReplicaId, allReplicas: Set[ReplicaId]): Behavior[Command] =
Behaviors.setup { ctx =>
ActiveActiveEventSourcing(entityId, replicaId, allReplicas, PersistenceTestKitReadJournal.Identifier)(
ActiveActiveEventSourcing.withSharedJournal(
entityId,
replicaId,
allReplicas,
PersistenceTestKitReadJournal.Identifier)(
aactx =>
EventSourcedBehavior[Command, String, Set[String]](
aactx.persistenceId,
@ -57,6 +61,10 @@ class ActiveActiveEventPublishingSpec
with AnyWordSpecLike
with LogCapturing {
val DCA = ReplicaId("DC-A")
val DCB = ReplicaId("DC-B")
val DCC = ReplicaId("DC-C")
private var idCounter = 0
def nextEntityId(): String = {
idCounter += 1
@ -68,18 +76,18 @@ class ActiveActiveEventPublishingSpec
"An active active actor" must {
"move forward when a published event from a replica is received" in {
val id = nextEntityId()
val actor = spawn(MyActiveActive(id, "DC-A", Set("DC-A", "DC-B")))
val actor = spawn(MyActiveActive(id, DCA, Set(DCA, DCB)))
val probe = createTestProbe[Any]()
actor ! MyActiveActive.Add("one", probe.ref)
probe.expectMessage(Done)
// simulate a published event from another replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
PersistenceId.replicatedUniqueId(id, "DC-B"),
PersistenceId.replicatedUniqueId(id, DCB),
1L,
"two",
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty)))
Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
actor ! MyActiveActive.Add("three", probe.ref)
probe.expectMessage(Done)
@ -89,18 +97,18 @@ class ActiveActiveEventPublishingSpec
"ignore a published event from a replica is received but the sequence number is unexpected" in {
val id = nextEntityId()
val actor = spawn(MyActiveActive(id, "DC-A", Set("DC-A", "DC-B")))
val actor = spawn(MyActiveActive(id, DCA, Set(DCA, DCB)))
val probe = createTestProbe[Any]()
actor ! MyActiveActive.Add("one", probe.ref)
probe.expectMessage(Done)
// simulate a published event from another replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
PersistenceId.replicatedUniqueId(id, "DC-B"),
PersistenceId.replicatedUniqueId(id, DCB),
2L, // missing 1L
"two",
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty)))
Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
actor ! MyActiveActive.Add("three", probe.ref)
probe.expectMessage(Done)
@ -110,18 +118,18 @@ class ActiveActiveEventPublishingSpec
"ignore a published event from an unknown replica" in {
val id = nextEntityId()
val actor = spawn(MyActiveActive(id, "DC-A", Set("DC-A", "DC-B")))
val actor = spawn(MyActiveActive(id, DCA, Set(DCA, DCB)))
val probe = createTestProbe[Any]()
actor ! MyActiveActive.Add("one", probe.ref)
probe.expectMessage(Done)
// simulate a published event from another replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
PersistenceId.replicatedUniqueId(id, "DC-C"),
PersistenceId.replicatedUniqueId(id, DCC),
1L,
"two",
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData("DC-C", VersionVector.empty)))
Some(new ReplicatedPublishedEventMetaData(DCC, VersionVector.empty)))
actor ! MyActiveActive.Add("three", probe.ref)
probe.expectMessage(Done)
@ -131,25 +139,25 @@ class ActiveActiveEventPublishingSpec
"ignore an already seen event from a replica" in {
val id = nextEntityId()
val actor = spawn(MyActiveActive(id, "DC-A", Set("DC-A", "DC-B")))
val actor = spawn(MyActiveActive(id, DCA, Set(DCA, DCB)))
val probe = createTestProbe[Any]()
actor ! MyActiveActive.Add("one", probe.ref)
probe.expectMessage(Done)
// simulate a published event from another replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
PersistenceId.replicatedUniqueId("myId4", "DC-B"),
PersistenceId.replicatedUniqueId("myId4", DCB),
1L,
"two",
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty)))
Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
// simulate another published event from that replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
PersistenceId.replicatedUniqueId(id, "DC-B"),
PersistenceId.replicatedUniqueId(id, DCB),
1L,
"two-again", // ofc this would be the same in the real world, different just so we can detect
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty)))
Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
actor ! MyActiveActive.Add("three", probe.ref)
probe.expectMessage(Done)
@ -161,7 +169,7 @@ class ActiveActiveEventPublishingSpec
"handle published events after replay" in {
val id = nextEntityId()
val probe = createTestProbe[Any]()
val activeActiveBehavior = MyActiveActive(id, "DC-A", Set("DC-A", "DC-B"))
val activeActiveBehavior = MyActiveActive(id, DCA, Set(DCA, DCB))
val incarnation1 = spawn(activeActiveBehavior)
incarnation1 ! MyActiveActive.Add("one", probe.ref)
probe.expectMessage(Done)
@ -177,11 +185,11 @@ class ActiveActiveEventPublishingSpec
// simulate a published event from another replica
incarnation2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
PersistenceId.replicatedUniqueId(id, "DC-B"),
PersistenceId.replicatedUniqueId(id, DCB),
1L,
"two",
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty)))
Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
incarnation2 ! MyActiveActive.Add("three", probe.ref)
probe.expectMessage(Done)
@ -193,18 +201,18 @@ class ActiveActiveEventPublishingSpec
"handle published events before and after replay" in {
val id = nextEntityId()
val probe = createTestProbe[Any]()
val activeActiveBehaviorA = MyActiveActive(id, "DC-A", Set("DC-A", "DC-B"))
val activeActiveBehaviorA = MyActiveActive(id, DCA, Set(DCA, DCB))
val incarnationA1 = spawn(activeActiveBehaviorA)
incarnationA1 ! MyActiveActive.Add("one", probe.ref)
probe.expectMessage(Done)
// simulate a published event from another replica
incarnationA1.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
PersistenceId.replicatedUniqueId(id, "DC-B"),
PersistenceId.replicatedUniqueId(id, DCB),
1L,
"two",
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty)))
Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
incarnationA1 ! MyActiveActive.Stop
probe.expectTerminated(incarnationA1)
@ -213,11 +221,11 @@ class ActiveActiveEventPublishingSpec
// simulate a published event from another replica
incarnationA2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
PersistenceId.replicatedUniqueId(id, "DC-B"),
PersistenceId.replicatedUniqueId(id, DCB),
2L,
"three",
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty)))
Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
incarnationA2 ! MyActiveActive.Add("four", probe.ref)
probe.expectMessage(Done)

View file

@ -7,23 +7,27 @@ package akka.persistence.typed
import java.util.concurrent.atomic.AtomicInteger
import akka.Done
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit }
import akka.actor.typed.{ ActorRef, Behavior }
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.scaladsl.{ ActiveActiveEventSourcing, Effect, EventSourcedBehavior }
import akka.persistence.typed.scaladsl.ActiveActiveEventSourcing
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import org.scalatest.concurrent.Eventually
import org.scalatest.wordspec.AnyWordSpecLike
object ActiveActiveSpec {
val AllReplicas = Set("R1", "R2", "R3")
val AllReplicas = Set(ReplicaId("R1"), ReplicaId("R2"), ReplicaId("R3"))
sealed trait Command
case class GetState(replyTo: ActorRef[State]) extends Command
case class StoreMe(description: String, replyTo: ActorRef[Done]) extends Command
case class StoreUs(descriptions: List[String], replyTo: ActorRef[Done]) extends Command
case class GetReplica(replyTo: ActorRef[(String, Set[String])]) extends Command
case class GetReplica(replyTo: ActorRef[(ReplicaId, Set[ReplicaId])]) extends Command
case object Stop extends Command
case class State(all: List[String])
@ -34,7 +38,11 @@ object ActiveActiveSpec {
entityId: String,
replicaId: String,
probe: Option[ActorRef[EventAndContext]] = None): Behavior[Command] =
ActiveActiveEventSourcing(entityId, replicaId, AllReplicas, PersistenceTestKitReadJournal.Identifier)(
ActiveActiveEventSourcing.withSharedJournal(
entityId,
ReplicaId(replicaId),
AllReplicas,
PersistenceTestKitReadJournal.Identifier)(
aaContext =>
EventSourcedBehavior[Command, String, State](
aaContext.persistenceId,
@ -61,7 +69,7 @@ object ActiveActiveSpec {
}
case class EventAndContext(event: Any, origin: String, recoveryRunning: Boolean, concurrent: Boolean)
case class EventAndContext(event: Any, origin: ReplicaId, recoveryRunning: Boolean, concurrent: Boolean)
class ActiveActiveSpec
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
@ -145,10 +153,10 @@ class ActiveActiveSpec
"have access to replica information" in {
val entityId = nextEntityId
val probe = createTestProbe[(String, Set[String])]()
val probe = createTestProbe[(ReplicaId, Set[ReplicaId])]()
val r1 = spawn(testBehavior(entityId, "R1"))
r1 ! GetReplica(probe.ref)
probe.expectMessage(("R1", Set("R1", "R2", "R3")))
probe.expectMessage((ReplicaId("R1"), Set(ReplicaId("R1"), ReplicaId("R2"), ReplicaId("R3"))))
}
"have access to event origin" in {
@ -161,12 +169,12 @@ class ActiveActiveSpec
val r2 = spawn(testBehavior(entityId, "R2", eventProbeR2.ref))
r1 ! StoreMe("from r1", replyProbe.ref)
eventProbeR2.expectMessage(EventAndContext("from r1", "R1", false, false))
eventProbeR1.expectMessage(EventAndContext("from r1", "R1", false, false))
eventProbeR2.expectMessage(EventAndContext("from r1", ReplicaId("R1"), false, false))
eventProbeR1.expectMessage(EventAndContext("from r1", ReplicaId("R1"), false, false))
r2 ! StoreMe("from r2", replyProbe.ref)
eventProbeR1.expectMessage(EventAndContext("from r2", "R2", false, false))
eventProbeR2.expectMessage(EventAndContext("from r2", "R2", false, false))
eventProbeR1.expectMessage(EventAndContext("from r2", ReplicaId("R2"), false, false))
eventProbeR2.expectMessage(EventAndContext("from r2", ReplicaId("R2"), false, false))
}
"set recovery running" in {
@ -175,12 +183,12 @@ class ActiveActiveSpec
val replyProbe = createTestProbe[Done]()
val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref))
r1 ! StoreMe("Event", replyProbe.ref)
eventProbeR1.expectMessage(EventAndContext("Event", "R1", recoveryRunning = false, false))
eventProbeR1.expectMessage(EventAndContext("Event", ReplicaId("R1"), recoveryRunning = false, false))
replyProbe.expectMessage(Done)
val recoveryProbe = createTestProbe[EventAndContext]()
spawn(testBehavior(entityId, "R1", recoveryProbe.ref))
recoveryProbe.expectMessage(EventAndContext("Event", "R1", recoveryRunning = true, false))
recoveryProbe.expectMessage(EventAndContext("Event", ReplicaId("R1"), recoveryRunning = true, false))
}
"persist all" in {
@ -197,10 +205,10 @@ class ActiveActiveSpec
// events at r2 happened concurrently with events at r1
eventProbeR1.expectMessage(EventAndContext("1 from r1", "R1", false, concurrent = false))
eventProbeR1.expectMessage(EventAndContext("2 from r1", "R1", false, concurrent = false))
eventProbeR1.expectMessage(EventAndContext("1 from r2", "R2", false, concurrent = true))
eventProbeR1.expectMessage(EventAndContext("2 from r2", "R2", false, concurrent = true))
eventProbeR1.expectMessage(EventAndContext("1 from r1", ReplicaId("R1"), false, concurrent = false))
eventProbeR1.expectMessage(EventAndContext("2 from r1", ReplicaId("R1"), false, concurrent = false))
eventProbeR1.expectMessage(EventAndContext("1 from r2", ReplicaId("R2"), false, concurrent = true))
eventProbeR1.expectMessage(EventAndContext("2 from r2", ReplicaId("R2"), false, concurrent = true))
eventually {
val probe = createTestProbe[State]()
@ -225,12 +233,16 @@ class ActiveActiveSpec
r2 ! StoreMe("from r2", probe.ref) // R2 0 R1 0 -> R2 1 R1 0
// each gets its local event
eventProbeR1.expectMessage(EventAndContext("from r1", "R1", recoveryRunning = false, concurrent = false))
eventProbeR2.expectMessage(EventAndContext("from r2", "R2", recoveryRunning = false, concurrent = false))
eventProbeR1.expectMessage(
EventAndContext("from r1", ReplicaId("R1"), recoveryRunning = false, concurrent = false))
eventProbeR2.expectMessage(
EventAndContext("from r2", ReplicaId("R2"), recoveryRunning = false, concurrent = false))
// then the replicated remote events, which will be concurrent
eventProbeR1.expectMessage(EventAndContext("from r2", "R2", recoveryRunning = false, concurrent = true))
eventProbeR2.expectMessage(EventAndContext("from r1", "R1", recoveryRunning = false, concurrent = true))
eventProbeR1.expectMessage(
EventAndContext("from r2", ReplicaId("R2"), recoveryRunning = false, concurrent = true))
eventProbeR2.expectMessage(
EventAndContext("from r1", ReplicaId("R1"), recoveryRunning = false, concurrent = true))
// state is updated
eventually {
@ -246,11 +258,11 @@ class ActiveActiveSpec
// Neither of these should be concurrent, nothing happening at r2
r1 ! StoreMe("from r1 2", probe.ref) // R1 1 R2 1
eventProbeR1.expectMessage(EventAndContext("from r1 2", "R1", false, concurrent = false))
eventProbeR2.expectMessage(EventAndContext("from r1 2", "R1", false, concurrent = false))
eventProbeR1.expectMessage(EventAndContext("from r1 2", ReplicaId("R1"), false, concurrent = false))
eventProbeR2.expectMessage(EventAndContext("from r1 2", ReplicaId("R1"), false, concurrent = false))
r1 ! StoreMe("from r1 3", probe.ref) // R2 2 R2 1
eventProbeR1.expectMessage(EventAndContext("from r1 3", "R1", false, concurrent = false))
eventProbeR2.expectMessage(EventAndContext("from r1 3", "R1", false, concurrent = false))
eventProbeR1.expectMessage(EventAndContext("from r1 3", ReplicaId("R1"), false, concurrent = false))
eventProbeR2.expectMessage(EventAndContext("from r1 3", ReplicaId("R1"), false, concurrent = false))
eventually {
val probe = createTestProbe[State]()
r2 ! GetState(probe.ref)
@ -259,8 +271,8 @@ class ActiveActiveSpec
// not concurrent as the above asserts mean that all events are fully replicated
r2 ! StoreMe("from r2 2", probe.ref)
eventProbeR1.expectMessage(EventAndContext("from r2 2", "R2", false, concurrent = false))
eventProbeR2.expectMessage(EventAndContext("from r2 2", "R2", false, concurrent = false))
eventProbeR1.expectMessage(EventAndContext("from r2 2", ReplicaId("R2"), false, concurrent = false))
eventProbeR2.expectMessage(EventAndContext("from r2 2", ReplicaId("R2"), false, concurrent = false))
eventually {
val probe = createTestProbe[State]()
r1 ! GetState(probe.ref)
@ -286,8 +298,8 @@ class ActiveActiveSpec
probe.expectMessage(Done)
// r2
eventProbeR2.expectMessage(EventAndContext("from r1 1", "R1", false, false))
eventProbeR2.expectMessage(EventAndContext("from r1 2", "R1", false, false))
eventProbeR2.expectMessage(EventAndContext("from r1 1", ReplicaId("R1"), false, false))
eventProbeR2.expectMessage(EventAndContext("from r1 2", ReplicaId("R1"), false, false))
r2 ! StoreMe("from r2 1", probe.ref)
probe.expectMessage(Done)
@ -297,10 +309,10 @@ class ActiveActiveSpec
// r3 should only get the events 1, not R2s stored version of them
val eventProbeR3 = createTestProbe[EventAndContext]()
spawn(testBehavior(entityId, "R3", eventProbeR3.ref))
eventProbeR3.expectMessage(EventAndContext("from r1 1", "R1", false, false))
eventProbeR3.expectMessage(EventAndContext("from r1 2", "R1", false, false))
eventProbeR3.expectMessage(EventAndContext("from r2 1", "R2", false, false))
eventProbeR3.expectMessage(EventAndContext("from r2 2", "R2", false, false))
eventProbeR3.expectMessage(EventAndContext("from r1 1", ReplicaId("R1"), false, false))
eventProbeR3.expectMessage(EventAndContext("from r1 2", ReplicaId("R1"), false, false))
eventProbeR3.expectMessage(EventAndContext("from r2 1", ReplicaId("R2"), false, false))
eventProbeR3.expectMessage(EventAndContext("from r2 2", ReplicaId("R2"), false, false))
eventProbeR3.expectNoMessage()
}
@ -313,14 +325,18 @@ class ActiveActiveSpec
r1 ! StoreMe("from r1", probe.ref) // R1 0 R2 0 -> R1 1 R2 0
r2 ! StoreMe("from r2", probe.ref) // R2 0 R1 0 -> R2 1 R1 0
// local event isn't concurrent, remote event is
eventProbeR1.expectMessage(EventAndContext("from r1", "R1", recoveryRunning = false, concurrent = false))
eventProbeR1.expectMessage(EventAndContext("from r2", "R2", recoveryRunning = false, concurrent = true))
eventProbeR1.expectMessage(
EventAndContext("from r1", ReplicaId("R1"), recoveryRunning = false, concurrent = false))
eventProbeR1.expectMessage(
EventAndContext("from r2", ReplicaId("R2"), recoveryRunning = false, concurrent = true))
// take 2
val eventProbeR1Take2 = createTestProbe[EventAndContext]()
spawn(testBehavior(entityId, "R1", eventProbeR1Take2.ref))
eventProbeR1Take2.expectMessage(EventAndContext("from r1", "R1", recoveryRunning = true, concurrent = false))
eventProbeR1Take2.expectMessage(EventAndContext("from r2", "R2", recoveryRunning = true, concurrent = true))
eventProbeR1Take2.expectMessage(
EventAndContext("from r1", ReplicaId("R1"), recoveryRunning = true, concurrent = false))
eventProbeR1Take2.expectMessage(
EventAndContext("from r2", ReplicaId("R2"), recoveryRunning = true, concurrent = true))
}
}

View file

@ -0,0 +1,111 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed
import java.util.concurrent.atomic.AtomicInteger
import akka.Done
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.scaladsl.CurrentEventsByPersistenceIdQuery
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.typed.scaladsl.ActiveActiveEventSourcing
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.stream.scaladsl.Sink
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.Eventually
import org.scalatest.wordspec.AnyWordSpecLike
object MultiJournalActiveActiveSpec {
object Actor {
sealed trait Command
case class GetState(replyTo: ActorRef[Set[String]]) extends Command
case class StoreMe(text: String, ack: ActorRef[Done]) extends Command
private val writeJournalPerReplica = Map("R1" -> "journal1.journal", "R2" -> "journal2.journal")
def apply(entityId: String, replicaId: String): Behavior[Command] = {
ActiveActiveEventSourcing(
entityId,
ReplicaId(replicaId),
Map(ReplicaId("R1") -> "journal1.query", ReplicaId("R2") -> "journal2.query"))(
aaContext =>
EventSourcedBehavior[Command, String, Set[String]](
aaContext.persistenceId,
Set.empty[String],
(state, command) =>
command match {
case GetState(replyTo) =>
replyTo ! state
Effect.none
case StoreMe(evt, ack) =>
Effect.persist(evt).thenRun(_ => ack ! Done)
},
(state, event) => state + event)).withJournalPluginId(writeJournalPerReplica(replicaId))
}
}
def separateJournalsConfig: Config = ConfigFactory.parseString(s"""
journal1 {
journal.class = "${classOf[PersistenceTestKitPlugin].getName}"
query = $${akka.persistence.testkit.query}
}
journal2 {
journal.class = "${classOf[PersistenceTestKitPlugin].getName}"
query = $${akka.persistence.testkit.query}
}
""").withFallback(ConfigFactory.load()).resolve()
}
class MultiJournalActiveActiveSpec
extends ScalaTestWithActorTestKit(MultiJournalActiveActiveSpec.separateJournalsConfig)
with AnyWordSpecLike
with LogCapturing
with Eventually {
import MultiJournalActiveActiveSpec._
val ids = new AtomicInteger(0)
def nextEntityId = s"e-${ids.getAndIncrement()}"
"ActiveActiveEventSourcing" should {
"support one journal per replica" in {
val r1 = spawn(Actor("id1", "R1"))
val r2 = spawn(Actor("id1", "R2"))
val probe = createTestProbe[Any]()
r1 ! Actor.StoreMe("r1 m1", probe.ref)
probe.expectMessage(Done)
r2 ! Actor.StoreMe("r2 m1", probe.ref)
probe.expectMessage(Done)
eventually {
val probe = createTestProbe[Set[String]]()
r1 ! Actor.GetState(probe.ref)
probe.receiveMessage() should ===(Set("r1 m1", "r2 m1"))
r2 ! Actor.GetState(probe.ref)
probe.receiveMessage() should ===(Set("r1 m1", "r2 m1"))
}
val readJournal1 = PersistenceQuery(system).readJournalFor[CurrentEventsByPersistenceIdQuery]("journal1.query")
val readJournal2 = PersistenceQuery(system).readJournalFor[CurrentEventsByPersistenceIdQuery]("journal2.query")
val eventsForJournal1 =
readJournal1.currentEventsByPersistenceId("id1|R1", 0L, Long.MaxValue).runWith(Sink.seq).futureValue
eventsForJournal1.map(_.event).toSet should ===(Set("r1 m1", "r2 m1"))
val eventsForJournal2 =
readJournal2.currentEventsByPersistenceId("id1|R2", 0L, Long.MaxValue).runWith(Sink.seq).futureValue
eventsForJournal2.map(_.event).toSet should ===(Set("r1 m1", "r2 m1"))
}
}
}

View file

@ -11,6 +11,7 @@ import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, _ }
import akka.actor.typed.{ ActorRef, Behavior }
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.scaladsl.{ ActiveActiveContext, ActiveActiveEventSourcing, Effect, EventSourcedBehavior }
import akka.serialization.jackson.CborSerializable
import org.scalatest.concurrent.{ Eventually, ScalaFutures }
@ -21,7 +22,7 @@ object AAAuctionExampleSpec {
type MoneyAmount = Int
case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originDc: String)
case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originDc: ReplicaId)
// commands
sealed trait AuctionCommand
@ -33,12 +34,13 @@ object AAAuctionExampleSpec {
sealed trait AuctionEvent extends CborSerializable
final case class BidRegistered(bid: Bid) extends AuctionEvent
final case class AuctionFinished(atDc: String) extends AuctionEvent
final case class WinnerDecided(atDc: String, winningBid: Bid, highestCounterOffer: MoneyAmount) extends AuctionEvent
final case class AuctionFinished(atDc: ReplicaId) extends AuctionEvent
final case class WinnerDecided(atDc: ReplicaId, winningBid: Bid, highestCounterOffer: MoneyAmount)
extends AuctionEvent
sealed trait AuctionPhase
case object Running extends AuctionPhase
final case class Closing(finishedAtDc: Set[String]) extends AuctionPhase
final case class Closing(finishedAtDc: Set[ReplicaId]) extends AuctionPhase
case object Closed extends AuctionPhase
case class AuctionState(
@ -85,8 +87,8 @@ object AAAuctionExampleSpec {
// If timestamps are equal, choose by dc where the offer was submitted
// In real auctions, this last comparison should be deterministic but unpredictable, so that submitting to a
// particular DC would not be an advantage.
(first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.compareTo(
second.originDc) < 0)
(first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.id
.compareTo(second.originDc.id) < 0)
}
case class AuctionSetup(
@ -94,7 +96,7 @@ object AAAuctionExampleSpec {
initialBid: Bid, // the initial bid is basically the minimum price bidden at start time by the owner
closingAt: Instant,
responsibleForClosing: Boolean,
allDcs: Set[String])
allDcs: Set[ReplicaId])
def commandHandler(setup: AuctionSetup, ctx: ActorContext[AuctionCommand], aaContext: ActiveActiveContext)(
state: AuctionState,
@ -199,15 +201,16 @@ object AAAuctionExampleSpec {
def initialState(setup: AuctionSetup) =
AuctionState(phase = Running, highestBid = setup.initialBid, highestCounterOffer = setup.initialBid.offer)
def behavior(replica: String, setup: AuctionSetup): Behavior[AuctionCommand] = Behaviors.setup[AuctionCommand] {
def behavior(replica: ReplicaId, setup: AuctionSetup): Behavior[AuctionCommand] = Behaviors.setup[AuctionCommand] {
ctx =>
ActiveActiveEventSourcing(setup.name, replica, setup.allDcs, PersistenceTestKitReadJournal.Identifier) { aaCtx =>
EventSourcedBehavior(
aaCtx.persistenceId,
initialState(setup),
commandHandler(setup, ctx, aaCtx),
eventHandler(ctx, aaCtx, setup))
}
ActiveActiveEventSourcing
.withSharedJournal(setup.name, replica, setup.allDcs, PersistenceTestKitReadJournal.Identifier) { aaCtx =>
EventSourcedBehavior(
aaCtx.persistenceId,
initialState(setup),
commandHandler(setup, ctx, aaCtx),
eventHandler(ctx, aaCtx, setup))
}
}
}
@ -223,19 +226,19 @@ class AAAuctionExampleSpec
"Auction example" should {
"work" in {
val Replicas = Set("DC-A", "DC-B")
val Replicas = Set(ReplicaId("DC-A"), ReplicaId("DC-B"))
val setupA =
AuctionSetup(
"old-skis",
Bid("chbatey", 12, Instant.now(), "DC-A"),
Bid("chbatey", 12, Instant.now(), ReplicaId("DC-A")),
Instant.now().plusSeconds(10),
responsibleForClosing = true,
Replicas)
val setupB = setupA.copy(responsibleForClosing = false)
val dcAReplica: ActorRef[AuctionCommand] = spawn(behavior("DC-A", setupA))
val dcBReplica: ActorRef[AuctionCommand] = spawn(behavior("DC-B", setupB))
val dcAReplica: ActorRef[AuctionCommand] = spawn(behavior(ReplicaId("DC-A"), setupA))
val dcBReplica: ActorRef[AuctionCommand] = spawn(behavior(ReplicaId("DC-B"), setupB))
dcAReplica ! OfferBid("me", 100)
dcAReplica ! OfferBid("me", 99)

View file

@ -10,6 +10,7 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.scaladsl._
import akka.serialization.jackson.CborSerializable
import org.scalatest.concurrent.{ Eventually, ScalaFutures }
@ -24,7 +25,7 @@ object AABlogExampleSpec {
copy(content = Some(newContent), contentTimestamp = timestamp)
def isEmpty: Boolean = content.isEmpty
}
val emptyState: BlogState = BlogState(None, LwwTime(Long.MinValue, ""), published = false)
val emptyState: BlogState = BlogState(None, LwwTime(Long.MinValue, ReplicaId("")), published = false)
final case class PostContent(title: String, body: String)
final case class PostSummary(postId: String, title: String)
@ -110,20 +111,30 @@ class AABlogExampleSpec
"Blog Example" should {
"work" in {
val refDcA: ActorRef[BlogCommand] =
spawn(Behaviors.setup[BlogCommand] { ctx =>
ActiveActiveEventSourcing("cat", "DC-A", Set("DC-A", "DC-B"), PersistenceTestKitReadJournal.Identifier) {
(aa: ActiveActiveContext) =>
spawn(
Behaviors.setup[BlogCommand] { ctx =>
ActiveActiveEventSourcing.withSharedJournal(
"cat",
ReplicaId("DC-A"),
Set(ReplicaId("DC-A"), ReplicaId("DC-B")),
PersistenceTestKitReadJournal.Identifier) { (aa: ActiveActiveContext) =>
behavior(aa, ctx)
}
}, "dc-a")
}
},
"dc-a")
val refDcB: ActorRef[BlogCommand] =
spawn(Behaviors.setup[BlogCommand] { ctx =>
ActiveActiveEventSourcing("cat", "DC-B", Set("DC-A", "DC-B"), PersistenceTestKitReadJournal.Identifier) {
(aa: ActiveActiveContext) =>
spawn(
Behaviors.setup[BlogCommand] { ctx =>
ActiveActiveEventSourcing.withSharedJournal(
"cat",
ReplicaId("DC-B"),
Set(ReplicaId("DC-A"), ReplicaId("DC-B")),
PersistenceTestKitReadJournal.Identifier) { (aa: ActiveActiveContext) =>
behavior(aa, ctx)
}
}, "dc-b")
}
},
"dc-b")
import akka.actor.typed.scaladsl.AskPattern._
import akka.util.Timeout

View file

@ -130,16 +130,16 @@ object PersistenceId {
* Constructs a persistence id from a unique entity id that includes the replica id.
*/
@InternalApi
private[akka] def replicatedUniqueId(entityId: String, replicaId: String): PersistenceId = {
private[akka] def replicatedUniqueId(entityId: String, replicaId: ReplicaId): PersistenceId = {
if (entityId.contains(DefaultSeparator))
throw new IllegalArgumentException(
s"entityId [$entityId] contains [$DefaultSeparator] which is a reserved character")
if (replicaId.contains(DefaultSeparator))
if (replicaId.id.contains(DefaultSeparator))
throw new IllegalArgumentException(
s"replicaId [$replicaId] contains [$DefaultSeparator] which is a reserved character")
s"replicaId [${replicaId.id}] contains [$DefaultSeparator] which is a reserved character")
new PersistenceId(entityId + DefaultSeparator + replicaId)
new PersistenceId(entityId + DefaultSeparator + replicaId.id)
}
}

View file

@ -0,0 +1,10 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed
/**
* Identifies a replica in Active Active eventsourcing, could be a datacenter name or a logical identifier.
*/
final case class ReplicaId(id: String)

View file

@ -12,6 +12,7 @@ import akka.actor.typed.Signal
import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.InternalApi
import akka.persistence._
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.scaladsl.EventSourcedBehavior.ActiveActive
import akka.persistence.typed.{ EventAdapter, PersistenceId, SnapshotAdapter }
import akka.persistence.typed.scaladsl.{ EventSourcedBehavior, RetentionCriteria }
@ -61,7 +62,7 @@ private[akka] final class BehaviorSetup[C, E, S](
val journal: ClassicActorRef = persistence.journalFor(settings.journalPluginId)
val snapshotStore: ClassicActorRef = persistence.snapshotStoreFor(settings.snapshotPluginId)
val replicaId: Option[String] = activeActive.map(_.replicaId)
val replicaId: Option[ReplicaId] = activeActive.map(_.replicaId)
def selfClassic: ClassicActorRef = context.self.toClassic

View file

@ -35,6 +35,7 @@ import akka.persistence.typed.EventAdapter
import akka.persistence.typed.NoOpEventAdapter
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.PublishedEvent
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.SnapshotAdapter
import akka.persistence.typed.SnapshotCompleted
import akka.persistence.typed.SnapshotFailed
@ -252,10 +253,9 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
override private[akka] def withActiveActive(
context: ActiveActiveContextImpl,
id: String,
allIds: Set[String],
queryPluginId: String): EventSourcedBehavior[Command, Event, State] = {
copy(activeActive = Some(ActiveActive(id, allIds, context, queryPluginId)))
replicaId: ReplicaId,
allReplicaIdsAndQueryPlugins: Map[ReplicaId, String]): EventSourcedBehavior[Command, Event, State] = {
copy(activeActive = Some(ActiveActive(replicaId, allReplicaIdsAndQueryPlugins, context)))
}
}
@ -282,7 +282,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
*/
@InternalApi
private[akka] final case class ReplicatedEventMetaData(
originReplica: String,
originReplica: ReplicaId,
originSequenceNr: Long,
version: VersionVector,
concurrent: Boolean) // whether when the event handler was executed the event was concurrent
@ -296,13 +296,13 @@ private[akka] final case class ReplicatedEventMetaData(
@InternalApi
private[akka] final case class ReplicatedEvent[E](
event: E,
originReplica: String,
originReplica: ReplicaId,
originSequenceNr: Long,
originVersion: VersionVector)
@InternalApi
private[akka] case object ReplicatedEventAck
final class ReplicatedPublishedEventMetaData(val replicaId: String, private[akka] val version: VersionVector)
final class ReplicatedPublishedEventMetaData(val replicaId: ReplicaId, private[akka] val version: VersionVector)
/**
* INTERNAL API

View file

@ -18,6 +18,7 @@ import akka.persistence.typed.EmptyEventSeq
import akka.persistence.typed.EventsSeq
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.RecoveryFailed
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.SingleEventSeq
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetState
import akka.persistence.typed.internal.ReplayingEvents.ReplayingState
@ -52,7 +53,7 @@ private[akka] object ReplayingEvents {
receivedPoisonPill: Boolean,
recoveryStartTime: Long,
version: VersionVector,
seenSeqNrPerReplica: Map[String, Long])
seenSeqNrPerReplica: Map[ReplicaId, Long])
def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: ReplayingState[S]): Behavior[InternalProtocol] =
Behaviors.setup { _ =>
@ -121,7 +122,7 @@ private[akka] final class ReplayingEvents[C, E, S](
eventForErrorReporting = OptionVal.Some(event)
state = state.copy(seqNr = repr.sequenceNr)
val aaMetaAndSelfReplica: Option[(ReplicatedEventMetaData, String)] =
val aaMetaAndSelfReplica: Option[(ReplicatedEventMetaData, ReplicaId)] =
setup.activeActive match {
case Some(aa) =>
val meta = repr.metadata match {

View file

@ -34,6 +34,7 @@ import akka.persistence.SnapshotProtocol
import akka.persistence.journal.Tagged
import akka.persistence.query.{ EventEnvelope, PersistenceQuery }
import akka.persistence.query.scaladsl.EventsByPersistenceIdQuery
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.{
DeleteEventsCompleted,
DeleteEventsFailed,
@ -91,8 +92,8 @@ private[akka] object Running {
state: State,
receivedPoisonPill: Boolean,
version: VersionVector,
seenPerReplica: Map[String, Long],
replicationControl: Map[String, ReplicationStreamControl]) {
seenPerReplica: Map[ReplicaId, Long],
replicationControl: Map[ReplicaId, ReplicationStreamControl]) {
def nextSequenceNr(): RunningState[State] =
copy(seqNr = seqNr + 1)
@ -128,8 +129,8 @@ private[akka] object Running {
if (replicaId != aa.replicaId) {
val seqNr = state.seenPerReplica(replicaId)
val pid = PersistenceId.replicatedUniqueId(aa.aaContext.entityId, replicaId)
// FIXME support different configuration per replica https://github.com/akka/akka/issues/29257
val replication = query.readJournalFor[EventsByPersistenceIdQuery](aa.queryPluginId)
val queryPluginId = aa.allReplicasAndQueryPlugins(replicaId)
val replication = query.readJournalFor[EventsByPersistenceIdQuery](queryPluginId)
implicit val timeout = Timeout(30.seconds)
@ -424,7 +425,7 @@ private[akka] object Running {
val newState2 = setup.activeActive match {
case Some(aa) =>
val updatedVersion = newState.version.updated(aa.replicaId, _currentSequenceNumber)
val updatedVersion = newState.version.updated(aa.replicaId.id, _currentSequenceNumber)
val r = internalPersist(
setup.context,
cmd,
@ -475,7 +476,7 @@ private[akka] object Running {
val adaptedEvent = adaptEvent(event)
val eventMetadata = metadataTemplate match {
case Some(template) =>
val updatedVersion = currentState.version.updated(template.originReplica, _currentSequenceNumber)
val updatedVersion = currentState.version.updated(template.originReplica.id, _currentSequenceNumber)
setup.log.trace("Processing event [{}] with version vector [{}]", event, updatedVersion)
currentState = currentState.copy(version = updatedVersion)
Some(template.copy(originSequenceNr = _currentSequenceNumber, version = updatedVersion))

View file

@ -5,20 +5,21 @@
package akka.persistence.typed.scaladsl
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.ReplicaId
import akka.util.WallClock
/**
* Utility class for comparing timestamp and data center
* identifier when implementing last-writer wins.
*/
final case class LwwTime(timestamp: Long, originDc: String) {
final case class LwwTime(timestamp: Long, originDc: ReplicaId) {
/**
* Create a new `LwwTime` that has a `timestamp` that is
* `max` of the given timestamp and previous timestamp + 1,
* i.e. monotonically increasing.
*/
def increase(t: Long, replicaId: String): LwwTime =
def increase(t: Long, replicaId: ReplicaId): LwwTime =
LwwTime(math.max(timestamp + 1, t), replicaId)
/**
@ -30,7 +31,7 @@ final case class LwwTime(timestamp: Long, originDc: String) {
def isAfter(other: LwwTime): Boolean = {
if (timestamp > other.timestamp) true
else if (timestamp < other.timestamp) false
else if (other.originDc.compareTo(originDc) > 0) true
else if (other.originDc.id.compareTo(originDc.id) > 0) true
else false
}
}
@ -38,10 +39,10 @@ final case class LwwTime(timestamp: Long, originDc: String) {
// FIXME docs
trait ActiveActiveContext {
def origin: String
def origin: ReplicaId
def concurrent: Boolean
def replicaId: String
def allReplicas: Set[String]
def replicaId: ReplicaId
def allReplicas: Set[ReplicaId]
def persistenceId: PersistenceId
def recoveryRunning: Boolean
def entityId: String
@ -52,9 +53,13 @@ trait ActiveActiveContext {
// FIXME, parts of this can be set during initialisation
// Other fields will be set before executing the event handler as they change per event
// https://github.com/akka/akka/issues/29258
private[akka] class ActiveActiveContextImpl(val entityId: String, val replicaId: String, val allReplicas: Set[String])
private[akka] class ActiveActiveContextImpl(
val entityId: String,
val replicaId: ReplicaId,
val replicasAndQueryPlugins: Map[ReplicaId, String])
extends ActiveActiveContext {
var _origin: String = null
val allReplicas: Set[ReplicaId] = replicasAndQueryPlugins.keySet
var _origin: ReplicaId = null
var _recoveryRunning: Boolean = false
var _concurrent: Boolean = false
@ -64,7 +69,7 @@ private[akka] class ActiveActiveContextImpl(val entityId: String, val replicaId:
* The origin of the current event.
* Undefined result if called from anywhere other than an event handler.
*/
override def origin: String = _origin
override def origin: ReplicaId = _origin
/**
* Whether the happened concurrently with an event from another replica.
@ -83,7 +88,7 @@ private[akka] class ActiveActiveContextImpl(val entityId: String, val replicaId:
object ActiveActiveEventSourcing {
/**
* Initialize a replicated event sourced behavior.
* Initialize a replicated event sourced behavior where all entity replicas are stored in the same journal.
*
* Events from each replica for the same entityId will be replicated to every copy.
* Care must be taken to handle events in any order as events can happen concurrently at different replicas.
@ -93,21 +98,43 @@ object ActiveActiveEventSourcing {
* A different journal plugin id can be configured using withJournalPluginId after creation. Different databases
* can be used for each replica.
* The events from other replicas are read using PersistentQuery.
* TODO support a different query plugin id per replicas: https://github.com/akka/akka/issues/29257
*
* @param replicaId The unique identity for this entity. The underlying persistence id will include the replica.
* @param allReplicaIds All replica ids. These need to be known to receive events from all replicas.
* @param queryPluginId Used to read the events from other replicas. Must be the query side of your configured journal plugin.
* @return
* @param queryPluginId A single query plugin used to read the events from other replicas. Must be the query side of your configured journal plugin.
*/
def withSharedJournal[Command, Event, State](
entityId: String,
replicaId: ReplicaId,
allReplicaIds: Set[ReplicaId],
queryPluginId: String)(activeActiveContext: ActiveActiveContext => EventSourcedBehavior[Command, Event, State])
: EventSourcedBehavior[Command, Event, State] =
apply(entityId, replicaId, allReplicaIds.map(id => id -> queryPluginId).toMap)(activeActiveContext)
/**
* Initialize a replicated event sourced behavior.
*
* Events from each replica for the same entityId will be replicated to every copy.
* Care must be taken to handle events in any order as events can happen concurrently at different replicas.
*
* Using an replicated event sourced behavior means there is no longer the single writer guarantee.
*
* The journal plugin id for the entity itself can be configured using withJournalPluginId after creation.
* A query side identifier is passed per replica allowing for separate database/journal configuration per
* replica. The events from other replicas are read using PersistentQuery.
*
* @param replicaId The unique identity for this entity. The underlying persistence id will include the replica.
* @param allReplicasAndQueryPlugins All replica ids and a query plugin per replica id. These need to be known to receive events from all replicas
* and configured with the query plugin for the journal that each replica uses.
*/
def apply[Command, Event, State](
entityId: String,
replicaId: String,
allReplicaIds: Set[String],
queryPluginId: String)(activeActiveContext: ActiveActiveContext => EventSourcedBehavior[Command, Event, State])
replicaId: ReplicaId,
allReplicasAndQueryPlugins: Map[ReplicaId, String])(
activeActiveContext: ActiveActiveContext => EventSourcedBehavior[Command, Event, State])
: EventSourcedBehavior[Command, Event, State] = {
val context = new ActiveActiveContextImpl(entityId, replicaId, allReplicaIds)
activeActiveContext(context).withActiveActive(context, replicaId, allReplicaIds, queryPluginId)
val context = new ActiveActiveContextImpl(entityId, replicaId, allReplicasAndQueryPlugins)
activeActiveContext(context).withActiveActive(context, replicaId, allReplicasAndQueryPlugins)
}
}

View file

@ -16,6 +16,7 @@ import akka.annotation.ApiMayChange
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.SnapshotAdapter
import akka.persistence.typed.SnapshotSelectionCriteria
import akka.persistence.typed.internal._
@ -25,15 +26,16 @@ object EventSourcedBehavior {
// FIXME move to internal
@InternalApi
private[akka] final case class ActiveActive(
replicaId: String,
allReplicas: Set[String],
aaContext: ActiveActiveContextImpl,
queryPluginId: String) {
replicaId: ReplicaId,
allReplicasAndQueryPlugins: Map[ReplicaId, String],
aaContext: ActiveActiveContextImpl) {
val allReplicas: Set[ReplicaId] = allReplicasAndQueryPlugins.keySet
/**
* Must only be called on the same thread that will execute the user code
*/
def setContext(recoveryRunning: Boolean, originReplica: String, concurrent: Boolean): Unit = {
def setContext(recoveryRunning: Boolean, originReplica: ReplicaId, concurrent: Boolean): Unit = {
aaContext._recoveryRunning = recoveryRunning
aaContext._concurrent = concurrent
aaContext._origin = originReplica
@ -166,9 +168,8 @@ object EventSourcedBehavior {
private[akka] def withActiveActive(
context: ActiveActiveContextImpl,
replicaId: String,
allReplicaIds: Set[String],
queryPluginId: String): EventSourcedBehavior[Command, Event, State]
replicaId: ReplicaId,
allReplicasAndQueryPlugins: Map[ReplicaId, String]): EventSourcedBehavior[Command, Event, State]
/**
* Change the snapshot store plugin id that this actor should use.