Rename active active to replicated event sourcing (#29435)

* Move active active internals into the internal package

* Renaming active active to Replicated Event Sourcing

* Rename of Active Active to Replicated Event Sourcing

* Revert changes to testkit

* Java test formatting...

* Gave up on getting apidoc to link to EventSourcedBehavior and made code snippets of it

Co-authored-by: Christopher Batey <christopher.batey@gmail.com>
This commit is contained in:
Johan Andrén 2020-07-29 12:12:48 +02:00 committed by Christopher Batey
parent 0b11ae362c
commit b03412d5b2
48 changed files with 1871 additions and 1439 deletions

View file

@ -9,7 +9,7 @@ import akka.actor.typed.Extension
import akka.actor.typed.ExtensionId
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
import akka.cluster.sharding.typed.internal.ActiveActiveShardingExtensionImpl
import akka.cluster.sharding.typed.internal.ReplicatedShardingExtensionImpl
import akka.cluster.sharding.typed.scaladsl.EntityRef
import akka.persistence.typed.ReplicaId
import java.util.{ Map => JMap }
@ -17,16 +17,16 @@ import java.util.{ Map => JMap }
import akka.actor.typed.ActorRef
/**
* Extension for running active active in sharding by starting one separate instance of sharding per replica.
* Extension for running Replicated Event Sourcing in sharding by starting one separate instance of sharding per replica.
* The sharding instances can be confined to datacenters or cluster roles or run on the same set of cluster nodes.
*/
@ApiMayChange
object ActiveActiveShardingExtension extends ExtensionId[ActiveActiveShardingExtension] {
object ReplicatedShardingExtension extends ExtensionId[ReplicatedShardingExtension] {
override def createExtension(system: ActorSystem[_]): ActiveActiveShardingExtension =
new ActiveActiveShardingExtensionImpl(system)
override def createExtension(system: ActorSystem[_]): ReplicatedShardingExtension =
new ReplicatedShardingExtensionImpl(system)
def get(system: ActorSystem[_]): ActiveActiveShardingExtension = apply(system)
def get(system: ActorSystem[_]): ReplicatedShardingExtension = apply(system)
}
@ -35,27 +35,27 @@ object ActiveActiveShardingExtension extends ExtensionId[ActiveActiveShardingExt
*/
@DoNotInherit
@ApiMayChange
trait ActiveActiveShardingExtension extends Extension {
trait ReplicatedShardingExtension extends Extension {
/**
* Init one instance sharding per replica in the given settings and return a [[ActiveActiveSharding]] representing those.
* Init one instance sharding per replica in the given settings and return a [[ReplicatedSharding]] representing those.
*
* @tparam M The type of messages the active active event sourced actor accepts
* @tparam M The type of messages the replicated event sourced actor accepts
* @tparam E The type of envelope used for routing messages to actors, the same for all replicas
*
* Note, multiple calls on the same node will not start new sharding instances but will return a new instance of [[ActiveActiveSharding]]
* Note, multiple calls on the same node will not start new sharding instances but will return a new instance of [[ReplicatedSharding]]
*/
def init[M, E](settings: ActiveActiveShardingSettings[M, E]): ActiveActiveSharding[M, E]
def init[M, E](settings: ReplicatedShardingSettings[M, E]): ReplicatedSharding[M, E]
}
/**
* Represents the sharding instances for the replicas of one active active entity type
* Represents the sharding instances for the replicas of one replicated event sourcing entity type
*
* Not for user extension.
*/
@DoNotInherit
@ApiMayChange
trait ActiveActiveSharding[M, E] {
trait ReplicatedSharding[M, E] {
/**
* Scala API: Returns the actor refs for the shard region or proxies of sharding for each replica for user defined

View file

@ -18,12 +18,12 @@ import akka.annotation.ApiMayChange
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
@ApiMayChange
object ActiveActiveShardingSettings {
object ReplicatedShardingSettings {
/**
* Java API:
*
* @tparam M The type of messages the active active entity accepts
* @tparam M The type of messages the replicated entity accepts
* @tparam E The type for envelopes used for sending `M`s over sharding
*/
def create[M, E](
@ -33,7 +33,7 @@ object ActiveActiveShardingSettings {
JEntityTypeKey[M],
ReplicaId,
JSet[ReplicaId],
ReplicaSettings[M, E]]): ActiveActiveShardingSettings[M, E] = {
ReplicaSettings[M, E]]): ReplicatedShardingSettings[M, E] = {
implicit val classTag: ClassTag[M] = ClassTag(messageClass)
apply[M, E](allReplicaIds.asScala.toSet)((key, replica, _) =>
settingsPerReplicaFactory(key.asInstanceOf[EntityTypeKeyImpl[M]], replica, allReplicaIds))
@ -42,13 +42,13 @@ object ActiveActiveShardingSettings {
/**
* Scala API:
*
* @tparam M The type of messages the active active entity accepts
* @tparam M The type of messages the replicated entity accepts
* @tparam E The type for envelopes used for sending `M`s over sharding
*/
def apply[M: ClassTag, E](allReplicaIds: Set[ReplicaId])(
settingsPerReplicaFactory: (EntityTypeKey[M], ReplicaId, Set[ReplicaId]) => ReplicaSettings[M, E])
: ActiveActiveShardingSettings[M, E] = {
new ActiveActiveShardingSettings(allReplicaIds.map { replicaId =>
: ReplicatedShardingSettings[M, E] = {
new ReplicatedShardingSettings(allReplicaIds.map { replicaId =>
val typeKey = EntityTypeKey[M](replicaId.id)
settingsPerReplicaFactory(typeKey, replicaId, allReplicaIds)
}.toVector, directReplication = false)
@ -56,23 +56,23 @@ object ActiveActiveShardingSettings {
}
/**
* @tparam M The type of messages the active active entity accepts
* @tparam M The type of messages the replicated entity accepts
* @tparam E The type for envelopes used for sending `M`s over sharding
*/
@ApiMayChange
final class ActiveActiveShardingSettings[M, E] private (
final class ReplicatedShardingSettings[M, E] private (
val replicas: immutable.Seq[ReplicaSettings[M, E]],
val directReplication: Boolean) {
/**
* Start direct replication over sharding when active active sharding starts up, requires the entities
* Start direct replication over sharding when replicated sharding starts up, requires the entities
* to also have it enabled through [[akka.persistence.typed.scaladsl.EventSourcedBehavior#withEventPublishing()]]
* or [[akka.persistence.typed.javadsl.ActiveActiveEventSourcedBehavior#withEventPublishing()]]
* or [[akka.persistence.typed.javadsl.ReplicatedEventSourcedBehavior#withEventPublishing()]]
* to work.
*
*/
def withDirectReplication(): ActiveActiveShardingSettings[M, E] =
new ActiveActiveShardingSettings(replicas, directReplication = true)
def withDirectReplication(): ReplicatedShardingSettings[M, E] =
new ReplicatedShardingSettings(replicas, directReplication = true)
}
@ -81,7 +81,7 @@ object ReplicaSettings {
/**
* Java API: Defines the [[akka.cluster.sharding.typed.javadsl.Entity]] to use for a given replica, note that the behavior
* can be a [[akka.persistence.typed.javadsl.ActiveActiveEventSourcedBehavior]] or an arbitrary non persistent
* can be a [[akka.persistence.typed.javadsl.ReplicatedEventSourcedBehavior]] or an arbitrary non persistent
* [[akka.actor.typed.Behavior]] but must never be a regular [[akka.persistence.typed.javadsl.EventSourcedBehavior]]
* as that requires a single writer and that would cause it to have multiple writers.
*/
@ -90,7 +90,7 @@ object ReplicaSettings {
/**
* Scala API: Defines the [[akka.cluster.sharding.typed.scaladsl.Entity]] to use for a given replica, note that the behavior
* can be a behavior created with [[akka.persistence.typed.scaladsl.ActiveActiveEventSourcing]] or an arbitrary non persistent
* can be a behavior created with [[akka.persistence.typed.scaladsl.ReplicatedEventSourcing]] or an arbitrary non persistent
* [[akka.actor.typed.Behavior]] but must never be a regular [[akka.persistence.typed.scaladsl.EventSourcedBehavior]]
* as that requires a single writer and that would cause it to have multiple writers.
*/
@ -99,7 +99,7 @@ object ReplicaSettings {
}
/**
* Settings for a specific replica id in active active sharding
* Settings for a specific replica id in replicated sharding
*/
@ApiMayChange
final class ReplicaSettings[M, E] private (val replicaId: ReplicaId, val entity: Entity[M, E])

View file

@ -18,13 +18,13 @@ import akka.persistence.typed.ReplicaId
import akka.util.ccompat.JavaConverters._
/**
* Used when sharding Active Active entities in multiple instances of sharding, for example one per DC in a Multi DC
* Used when sharding Replicated Event Sourced entities in multiple instances of sharding, for example one per DC in a Multi DC
* Akka Cluster.
*
* This actor should be started once on each node where Active Active entities will run (the same nodes that you start
* This actor should be started once on each node where Replicated Event Sourced entities will run (the same nodes that you start
* sharding on). The entities should be set up with [[akka.persistence.typed.scaladsl.EventSourcedBehavior#withEventPublishing()]]
* or [[akka.persistence.typed.javadsl.ActiveActiveEventSourcedBehavior#withEventPublishing()]]
* If using [[ActiveActiveSharding]] the replication can be enabled through [[ActiveActiveShardingSettings#withDirectReplication()]]
* or [[akka.persistence.typed.javadsl.ReplicatedEventSourcedBehavior#withEventPublishing()]]
* If using [[ReplicatedSharding]] the replication can be enabled through [[ReplicatedShardingSettings#withDirectReplication()]]
* instead of starting this actor manually.
*
* Subscribes to locally written events through the event stream and sends the seen events to all the sharded replicas
@ -36,7 +36,7 @@ import akka.util.ccompat.JavaConverters._
* by default and with a custom extractor since the envelopes are handled internally.
*/
@ApiMayChange
object ActiveActiveShardingDirectReplication {
object ShardingDirectReplication {
/**
* Not for user extension

View file

@ -10,9 +10,9 @@ import java.util.{ Map => JMap }
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.cluster.sharding.typed.ActiveActiveShardingExtension
import akka.cluster.sharding.typed.ActiveActiveSharding
import akka.cluster.sharding.typed.ActiveActiveShardingSettings
import akka.cluster.sharding.typed.ReplicatedShardingExtension
import akka.cluster.sharding.typed.ReplicatedSharding
import akka.cluster.sharding.typed.ReplicatedShardingSettings
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.EntityRef
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
@ -20,7 +20,7 @@ import akka.persistence.typed.PersistenceId
import akka.persistence.typed.ReplicaId
import org.slf4j.LoggerFactory
import akka.actor.typed.scaladsl.LoggerOps
import akka.cluster.sharding.typed.ActiveActiveShardingDirectReplication
import akka.cluster.sharding.typed.ShardingDirectReplication
import akka.util.ccompat.JavaConverters._
@ -28,19 +28,18 @@ import akka.util.ccompat.JavaConverters._
* INTERNAL API
*/
@InternalApi
private[akka] final class ActiveActiveShardingExtensionImpl(system: ActorSystem[_])
extends ActiveActiveShardingExtension {
private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_]) extends ReplicatedShardingExtension {
private val counter = new AtomicLong(0)
private val logger = LoggerFactory.getLogger(getClass)
override def init[M, E](settings: ActiveActiveShardingSettings[M, E]): ActiveActiveSharding[M, E] = {
override def init[M, E](settings: ReplicatedShardingSettings[M, E]): ReplicatedSharding[M, E] = {
val sharding = ClusterSharding(system)
val initializedReplicas = settings.replicas.map { replicaSettings =>
// start up a sharding instance per replica id
logger.infoN(
"Starting Active Active sharding for replica [{}] (ShardType: [{}])",
"Starting Replicated Event Sourcing sharding for replica [{}] (ShardType: [{}])",
replicaSettings.replicaId.id,
replicaSettings.entity.typeKey.name)
val regionOrProxy = sharding.init(replicaSettings.entity)
@ -50,14 +49,14 @@ private[akka] final class ActiveActiveShardingExtensionImpl(system: ActorSystem[
case (id, _, regionOrProxy) => id -> regionOrProxy
}.toMap
if (settings.directReplication) {
logger.infoN("Starting Active Active Direct Replication")
logger.infoN("Starting Replicated Event Sourcing Direct Replication")
system.systemActorOf(
ActiveActiveShardingDirectReplication(replicaToRegionOrProxy),
s"activeActiveDirectReplication-${counter.incrementAndGet()}")
ShardingDirectReplication(replicaToRegionOrProxy),
s"directReplication-${counter.incrementAndGet()}")
}
val replicaToTypeKey = initializedReplicas.map { case (id, typeKey, _) => id -> typeKey }.toMap
new ActiveActiveShardingImpl(sharding, replicaToRegionOrProxy, replicaToTypeKey)
new ReplicatedShardingImpl(sharding, replicaToRegionOrProxy, replicaToTypeKey)
}
}
@ -65,11 +64,11 @@ private[akka] final class ActiveActiveShardingExtensionImpl(system: ActorSystem[
* INTERNAL API
*/
@InternalApi
private[akka] final class ActiveActiveShardingImpl[M, E](
private[akka] final class ReplicatedShardingImpl[M, E](
sharding: ClusterSharding,
shardingPerReplica: Map[ReplicaId, ActorRef[E]],
replicaTypeKeys: Map[ReplicaId, EntityTypeKey[M]])
extends ActiveActiveSharding[M, E] {
extends ReplicatedSharding[M, E] {
override def shardingRefs: Map[ReplicaId, ActorRef[E]] = shardingPerReplica
override def getShardingRefs: JMap[ReplicaId, ActorRef[E]] = shardingRefs.asJava

View file

@ -27,18 +27,16 @@ import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
import scala.util.Random;
import java.util.*;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
public class ActiveActiveShardingTest extends JUnitSuite {
public class ReplicatedShardingTest extends JUnitSuite {
static class MyActiveActiveStringSet
extends ActiveActiveEventSourcedBehavior<
MyActiveActiveStringSet.Command, String, Set<String>> {
static class MyReplicatedStringSet
extends ReplicatedEventSourcedBehavior<MyReplicatedStringSet.Command, String, Set<String>> {
interface Command {}
static class Add implements Command {
@ -67,16 +65,16 @@ public class ActiveActiveShardingTest extends JUnitSuite {
static Behavior<Command> create(
String entityId, ReplicaId replicaId, Set<ReplicaId> allReplicas) {
return ActiveActiveEventSourcing.withSharedJournal(
return ReplicatedEventSourcing.withSharedJournal(
entityId,
replicaId,
allReplicas,
PersistenceTestKitReadJournal.Identifier(),
MyActiveActiveStringSet::new);
MyReplicatedStringSet::new);
}
private MyActiveActiveStringSet(ActiveActiveContext activeActiveContext) {
super(activeActiveContext);
private MyReplicatedStringSet(ReplicationContext replicationContext) {
super(replicationContext);
}
@Override
@ -116,9 +114,9 @@ public class ActiveActiveShardingTest extends JUnitSuite {
public static final class ForwardToRandom implements Command {
public final String entityId;
public final MyActiveActiveStringSet.Command message;
public final MyReplicatedStringSet.Command message;
public ForwardToRandom(String entityId, MyActiveActiveStringSet.Command message) {
public ForwardToRandom(String entityId, MyReplicatedStringSet.Command message) {
this.entityId = entityId;
this.message = message;
}
@ -126,9 +124,9 @@ public class ActiveActiveShardingTest extends JUnitSuite {
public static final class ForwardToAll implements Command {
public final String entityId;
public final MyActiveActiveStringSet.Command message;
public final MyReplicatedStringSet.Command message;
public ForwardToAll(String entityId, MyActiveActiveStringSet.Command message) {
public ForwardToAll(String entityId, MyReplicatedStringSet.Command message) {
this.entityId = entityId;
this.message = message;
}
@ -144,19 +142,19 @@ public class ActiveActiveShardingTest extends JUnitSuite {
Arrays.asList(
new ReplicaId("DC-A"), new ReplicaId("DC-B"), new ReplicaId("DC-C"))));
private final ActiveActiveSharding<
MyActiveActiveStringSet.Command, ShardingEnvelope<MyActiveActiveStringSet.Command>>
private final ReplicatedSharding<
MyReplicatedStringSet.Command, ShardingEnvelope<MyReplicatedStringSet.Command>>
aaSharding;
private ProxyActor(ActorContext<Command> context) {
super(context);
// #bootstrap
ActiveActiveShardingSettings<
MyActiveActiveStringSet.Command, ShardingEnvelope<MyActiveActiveStringSet.Command>>
ReplicatedShardingSettings<
MyReplicatedStringSet.Command, ShardingEnvelope<MyReplicatedStringSet.Command>>
aaShardingSettings =
ActiveActiveShardingSettings.create(
MyActiveActiveStringSet.Command.class,
ReplicatedShardingSettings.create(
MyReplicatedStringSet.Command.class,
ALL_REPLICAS,
// factory for replica settings for a given replica
(entityTypeKey, replicaId, allReplicas) ->
@ -168,7 +166,7 @@ public class ActiveActiveShardingTest extends JUnitSuite {
entityTypeKey,
entityContext ->
// factory for the entity for a given entity in that replica
MyActiveActiveStringSet.create(
MyReplicatedStringSet.create(
entityContext.getEntityId(), replicaId, allReplicas))
// potentially use replica id as role or dc in Akka multi dc for the
// sharding instance
@ -176,8 +174,8 @@ public class ActiveActiveShardingTest extends JUnitSuite {
// .withDataCenter(replicaId.id()))
.withRole(replicaId.id())));
ActiveActiveShardingExtension extension =
ActiveActiveShardingExtension.get(getContext().getSystem());
ReplicatedShardingExtension extension =
ReplicatedShardingExtension.get(getContext().getSystem());
aaSharding = extension.init(aaShardingSettings);
// #bootstrap
}
@ -191,7 +189,7 @@ public class ActiveActiveShardingTest extends JUnitSuite {
}
private Behavior<Command> onForwardToRandom(ForwardToRandom forwardToRandom) {
Map<ReplicaId, EntityRef<MyActiveActiveStringSet.Command>> refs =
Map<ReplicaId, EntityRef<MyReplicatedStringSet.Command>> refs =
aaSharding.getEntityRefsFor(forwardToRandom.entityId);
int chosenIdx = new java.util.Random().nextInt(refs.size());
new ArrayList<>(refs.values()).get(chosenIdx).tell(forwardToRandom.message);
@ -200,7 +198,7 @@ public class ActiveActiveShardingTest extends JUnitSuite {
private Behavior<Command> onForwardToAll(ForwardToAll forwardToAll) {
// #all-entity-refs
Map<ReplicaId, EntityRef<MyActiveActiveStringSet.Command>> refs =
Map<ReplicaId, EntityRef<MyReplicatedStringSet.Command>> refs =
aaSharding.getEntityRefsFor(forwardToAll.entityId);
refs.forEach((replicaId, ref) -> ref.tell(forwardToAll.message));
// #all-entity-refs
@ -238,16 +236,16 @@ public class ActiveActiveShardingTest extends JUnitSuite {
// forward messages to replicas
ActorRef<ProxyActor.Command> proxy = testKit.spawn(ProxyActor.create());
proxy.tell(new ProxyActor.ForwardToAll("id1", new MyActiveActiveStringSet.Add("to-all")));
proxy.tell(new ProxyActor.ForwardToRandom("id1", new MyActiveActiveStringSet.Add("to-random")));
proxy.tell(new ProxyActor.ForwardToAll("id1", new MyReplicatedStringSet.Add("to-all")));
proxy.tell(new ProxyActor.ForwardToRandom("id1", new MyReplicatedStringSet.Add("to-random")));
testProbe.awaitAssert(
() -> {
TestProbe<MyActiveActiveStringSet.Texts> responseProbe = testKit.createTestProbe();
TestProbe<MyReplicatedStringSet.Texts> responseProbe = testKit.createTestProbe();
proxy.tell(
new ProxyActor.ForwardToAll(
"id1", new MyActiveActiveStringSet.GetTexts(responseProbe.ref())));
List<MyActiveActiveStringSet.Texts> responses = responseProbe.receiveSeveralMessages(3);
"id1", new MyReplicatedStringSet.GetTexts(responseProbe.ref())));
List<MyReplicatedStringSet.Texts> responses = responseProbe.receiveSeveralMessages(3);
Set<String> uniqueTexts =
responses.stream().flatMap(res -> res.texts.stream()).collect(Collectors.toSet());
assertEquals(new HashSet<>(Arrays.asList("to-all", "to-random")), uniqueTexts);

View file

@ -15,12 +15,9 @@ import akka.persistence.typed.PublishedEvent
import akka.persistence.typed.internal.{ PublishedEventImpl, ReplicatedPublishedEventMetaData, VersionVector }
import akka.persistence.typed.ReplicaId
class ActiveActiveShardingDirectReplicationSpec
extends ScalaTestWithActorTestKit
with AnyWordSpecLike
with LogCapturing {
class ReplicatedShardingDirectReplicationSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing {
"Active active sharding replication" must {
"Replicated sharding direct replication" must {
"replicate published events to all sharding proxies" in {
val replicaAProbe = createTestProbe[ShardingEnvelope[PublishedEvent]]()
@ -28,7 +25,7 @@ class ActiveActiveShardingDirectReplicationSpec
val replicaCProbe = createTestProbe[ShardingEnvelope[PublishedEvent]]()
val replicationActor = spawn(
ActiveActiveShardingDirectReplication(
ShardingDirectReplication(
typed.ReplicaId("ReplicaA"),
replicaShardingProxies = Map(
ReplicaId("ReplicaA") -> replicaAProbe.ref,
@ -36,7 +33,7 @@ class ActiveActiveShardingDirectReplicationSpec
ReplicaId("ReplicaC") -> replicaCProbe.ref)))
val upProbe = createTestProbe[Done]()
replicationActor ! ActiveActiveShardingDirectReplication.VerifyStarted(upProbe.ref)
replicationActor ! ShardingDirectReplication.VerifyStarted(upProbe.ref)
upProbe.receiveMessage() // not bullet proof wrt to subscription being complete but good enough
val event = PublishedEventImpl(

View file

@ -16,7 +16,7 @@ import akka.cluster.typed.Join
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.scaladsl.ActiveActiveEventSourcing
import akka.persistence.typed.scaladsl.ReplicatedEventSourcing
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.serialization.jackson.CborSerializable
@ -25,7 +25,7 @@ import org.scalatest.wordspec.AnyWordSpecLike
import scala.util.Random
object ActiveActiveShardingSpec {
object ReplicatedShardingSpec {
def config = ConfigFactory.parseString("""
akka.loglevel = DEBUG
akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
@ -36,19 +36,19 @@ object ActiveActiveShardingSpec {
akka.remote.artery.canonical.port = 0""").withFallback(PersistenceTestKitPlugin.config)
}
class ActiveActiveShardingSpec
extends ScalaTestWithActorTestKit(ActiveActiveShardingSpec.config)
class ReplicatedShardingSpec
extends ScalaTestWithActorTestKit(ReplicatedShardingSpec.config)
with AnyWordSpecLike
with LogCapturing {
object MyActiveActiveStringSet {
object MyReplicatedStringSet {
trait Command extends CborSerializable
case class Add(text: String) extends Command
case class GetTexts(replyTo: ActorRef[Texts]) extends Command
case class Texts(texts: Set[String]) extends CborSerializable
def apply(entityId: String, replicaId: ReplicaId, allReplicas: Set[ReplicaId]): Behavior[Command] =
ActiveActiveEventSourcing.withSharedJournal(
ReplicatedEventSourcing.withSharedJournal(
entityId,
replicaId,
allReplicas,
@ -70,15 +70,13 @@ class ActiveActiveShardingSpec
object ProxyActor {
sealed trait Command
case class ForwardToRandom(entityId: String, msg: MyActiveActiveStringSet.Command) extends Command
case class ForwardToAll(entityId: String, msg: MyActiveActiveStringSet.Command) extends Command
case class ForwardToRandom(entityId: String, msg: MyReplicatedStringSet.Command) extends Command
case class ForwardToAll(entityId: String, msg: MyReplicatedStringSet.Command) extends Command
def apply(): Behavior[Command] = Behaviors.setup { context =>
// #bootstrap
val aaShardingSettings =
ActiveActiveShardingSettings[
MyActiveActiveStringSet.Command,
ShardingEnvelope[MyActiveActiveStringSet.Command]](
ReplicatedShardingSettings[MyReplicatedStringSet.Command, ShardingEnvelope[MyReplicatedStringSet.Command]](
// all replicas
Set(ReplicaId("DC-A"), ReplicaId("DC-B"), ReplicaId("DC-C"))) { (entityTypeKey, replicaId, allReplicaIds) =>
// factory for replica settings for a given replica
@ -87,7 +85,7 @@ class ActiveActiveShardingSpec
// use the provided entity type key for sharding to get one sharding instance per replica
Entity(entityTypeKey) { entityContext =>
// factory for the entity for a given entity in that replica
MyActiveActiveStringSet(entityContext.entityId, replicaId, allReplicaIds)
MyReplicatedStringSet(entityContext.entityId, replicaId, allReplicaIds)
}
// potentially use replica id as role or dc in Akka multi dc for the sharding instance
// to control where replicas will live
@ -95,7 +93,7 @@ class ActiveActiveShardingSpec
.withRole(replicaId.id))
}
val aaSharding = ActiveActiveShardingExtension(context.system).init(aaShardingSettings)
val aaSharding = ReplicatedShardingExtension(context.system).init(aaShardingSettings)
// #bootstrap
Behaviors.receiveMessage {
@ -115,7 +113,7 @@ class ActiveActiveShardingSpec
}
}
"Active active sharding" should {
"Replicated sharding" should {
"form a one node cluster" in {
val node = Cluster(system)
@ -128,13 +126,13 @@ class ActiveActiveShardingSpec
"forward to replicas" in {
val proxy = spawn(ProxyActor())
proxy ! ProxyActor.ForwardToAll("id1", MyActiveActiveStringSet.Add("to-all"))
proxy ! ProxyActor.ForwardToRandom("id1", MyActiveActiveStringSet.Add("to-random"))
proxy ! ProxyActor.ForwardToAll("id1", MyReplicatedStringSet.Add("to-all"))
proxy ! ProxyActor.ForwardToRandom("id1", MyReplicatedStringSet.Add("to-random"))
eventually {
val probe = createTestProbe[MyActiveActiveStringSet.Texts]()
proxy ! ProxyActor.ForwardToAll("id1", MyActiveActiveStringSet.GetTexts(probe.ref))
val responses: Seq[MyActiveActiveStringSet.Texts] = probe.receiveMessages(3)
val probe = createTestProbe[MyReplicatedStringSet.Texts]()
proxy ! ProxyActor.ForwardToAll("id1", MyReplicatedStringSet.GetTexts(probe.ref))
val responses: Seq[MyReplicatedStringSet.Texts] = probe.receiveMessages(3)
val uniqueTexts = responses.flatMap(res => res.texts).toSet
uniqueTexts should ===(Set("to-all", "to-random"))
}

View file

@ -61,11 +61,7 @@ from the events, or publish the events to other services.
## Multi-DC Persistence
@java[@extref[Multi-DC Persistence example project](samples:akka-samples-persistence-dc-java)]
@scala[@extref[Multi-DC Persistence example project](samples:akka-samples-persistence-dc-scala)]
Illustrates how to use Lightbend's [Multi-DC Persistence](https://doc.akka.io/docs/akka-enhancements/current/persistence-dc/index.html)
with active-active persistent entities across data centers.
This commercial feature has now been superseeded by @ref[Replicated Event Sourcing](../typed/replicated-eventsourcing.md)
## Cluster with Docker

View file

@ -192,8 +192,8 @@ other data centers.
Especially when used together with Akka Persistence that is based on the single-writer principle
it is important to avoid running the same entity at multiple locations at the same time with a
shared data store. That would result in corrupt data since the events stored by different instances
may be interleaved and would be interpreted differently in a later replay. For active active persistent
entities see Lightbend's [Multi-DC Persistence](https://doc.akka.io/docs/akka-enhancements/current/persistence-dc/index.html)
may be interleaved and would be interpreted differently in a later replay. For replicated persistent
entities see @ref[Replciated Event Sourcing](replicated-eventsourcing.md).
If you need global entities you have to pick one data center to host that entity type and only start
`ClusterSharding` on nodes of that data center. If the data center is unreachable from another data center the

View file

@ -9,7 +9,7 @@ project.description: Event Sourcing with Akka Persistence enables actors to pers
@@@ index
* [persistence](persistence.md)
* [active-active](persistence-active-active.md)
* [active-active](replicated-eventsourcing.md)
* [cqrs](cqrs.md)
* [persistence-style](persistence-style.md)
* [persistence-snapshot](persistence-snapshot.md)
@ -20,6 +20,6 @@ project.description: Event Sourcing with Akka Persistence enables actors to pers
* [persistence-query-leveldb](../persistence-query-leveldb.md)
* [persistence-plugins](../persistence-plugins.md)
* [persistence-journals](../persistence-journals.md)
* [active-active-examples](persistence-active-active-examples.md)
* [active-active-examples](replicated-eventsourcing-examples.md)
@@@

View file

@ -1,6 +1,6 @@
# Active-Active Examples
# Replicated Event Sourcing Examples
The following are more realistic examples of building systems with active-active event sourcing.
The following are more realistic examples of building systems with Replicated Event Sourcing.
## Auction
@ -16,21 +16,21 @@ We are building a small auction service. It has the following operations:
We model those operations as commands to be sent to the auction actor:
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala) { #commands }
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #commands }
The events:
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala) { #events }
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #events }
The winner does not have to pay the highest bid but only enough to beat the second highest so the `highestCounterOffer` is in the `AuctionFinished` event.
Let's have a look at the auction entity that will handle incoming commands:
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala) { #command-handler }
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #command-handler }
There is nothing specific to active-active about the command handler. It is the same as a command handler for a standard `EventSourcedBehavior`.
There is nothing specific to Replicated Event Sourcing about the command handler. It is the same as a command handler for a standard `EventSourcedBehavior`.
For `OfferBid` and `AuctionFinished` we do nothing more than to emit
events corresponding to the command. For `GetHighestBid` we respond with details from the state. Note, that we overwrite the actual
offer of the highest bid here with the amount of the `highestCounterOffer`. This is done to follow the popular auction style where
@ -41,13 +41,13 @@ The initial state is taken from a `AuctionSetup` instance. The minimum bid is mo
an `initialBid`.
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala) { #setup }
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #setup }
The auction moves through the following phases:
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala) { #phase }
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #phase }
The closing and closed states are to model waiting for all replicas to see the result of the auction before
actually closing the action.
@ -56,7 +56,7 @@ Let's have a look at our state class, `AuctionState` which also represents the C
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala) { #state }
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #state }
The state consists of a flag that keeps track of whether the auction is still active, the currently highest bid,
and the highest counter offer so far.
@ -95,9 +95,9 @@ all replicas have seen all bids.
In the event handler above, when recovery is not running, it calls `eventTriggers`.
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala) { #event-triggers }
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #event-triggers }
The event trigger uses the `ActiveActiveContext` to decide when to trigger the Finish of the action.
The event trigger uses the `ReplicationContext` to decide when to trigger the Finish of the action.
When a replica saves the `AuctionFinished` event it checks whether it should close the auction.
For the close to happen the replica must be the one designated to close and all replicas must have
reported that they have finished.

View file

@ -1,4 +1,4 @@
# Active-Active Event Sourcing
# Replicated Event Sourcing
@@@ warning
@ -13,7 +13,7 @@ warning or deprecation period. It is also not recommended to use this module in
This restriction means that in the event of network partitions, and for a short time during rolling re-deploys, `EventSourcedBehaviors`s are unavailable.
Active-active event sourcing enables running multiple replicas of each entity.
Replicated Event Sourcing enables running multiple replicas of each entity.
There is automatic replication of every event persisted to all replicas.
For instance, a replica can be run per:
@ -27,27 +27,27 @@ The motivations are:
* Serve requests from a location near the user to provide better responsiveness
* Balance the load over many servers
However, the event handler must be able to **handle concurrent events** as when active-active is enabled
However, the event handler must be able to **handle concurrent events** as when replication is enabled
there is no longer the single writer principle as there is with a normal `EventSourcedBehavior`.
The state of an active-active `EventSourcedBehavior` is **eventually consistent**. Event replication may be delayed
The state of a replicated `EventSourcedBehavior` is **eventually consistent**. Event replication may be delayed
due to network partitions and outages and the event handler and those reading the state must be designed to handle this.
To be able to use active active the journal and snapshot store used is required to have specific support for the metadata that active active needs (see @ref[Journal Support](#journal-support))
To be able to use Replicated Event Sourcing the journal and snapshot store used is required to have specific support for the metadata that the replication needs (see @ref[Journal Support](#journal-support))
## Relaxing the single writer principle for availability
Taking the example of using active-active to run a replica per data center.
Taking the example of using Replicated Event Sourcing to run a replica per data center.
When there is no network partitions and no concurrent writes the events stored by an `EventSourcedBehavior` at one replica can be replicated and consumed by another (corresponding) replica in another data center without any concerns. Such replicated events can simply be applied to the local state.
When there is no network partitions and no concurrent writes the events stored by a `EventSourcedBehavior` at one replica can be replicated and consumed by another (corresponding) replica in another data center without any concerns. Such replicated events can simply be applied to the local state.
![images/replicated-events1.png](images/replicated-events1.png)
The interesting part begins when there are concurrent writes by `EventSourcedBehavior`replicas. That is more likely to happen when there is a network partition, but it can also happen when there are no network issues. They simply write at the "same time" before the events from the other side have been replicated and consumed.
The interesting part begins when there are concurrent writes by `EventSourcedBehavior` replicas. That is more likely to happen when there is a network partition, but it can also happen when there are no network issues. They simply write at the "same time" before the events from the other side have been replicated and consumed.
![images/replicated-events2.png](images/replicated-events2.png)
The event handler logic for applying events to the state of the entity must be aware of that such concurrent updates can occur and it must be modeled to handle such conflicts. This means that it should typically have the same characteristics as a Conflict Free Replicated Data Type (CRDT). With a CRDT there are by definition no conflicts and the events can just be applied. The library provides some general purpose CRDTs, but the logic of how to apply events can also be defined by an application specific function.
The event handler logic for applying events to the state of the entity must be aware of that such concurrent updates can occur, and it must be modeled to handle such conflicts. This means that it should typically have the same characteristics as a Conflict Free Replicated Data Type (CRDT). With a CRDT there are by definition no conflicts, the events can always be applied. The library provides some general purpose CRDTs, but the logic of how to apply events can also be defined by an application specific function.
For example, sometimes it's enough to use application specific timestamps to decide which update should win.
@ -58,44 +58,44 @@ To assist in implementing the event handler active-active detects these conflict
@scala[The same API as regular `EventSourcedBehavior`s]@java[A very similar API to the regular `EventSourcedBehavior`] is used to define the logic.
To enable an entity for active-active
replication @java[let it extend `ActiveActiveEventSourcedBehavior` instead of `EventSourcedBehavior` and] use the factory methods on @apidoc[ActiveActiveEventSourcing].
replication @java[let it extend `ReplicatedEventSourcedBehavior` instead of `EventSourcedBehavior` and] use the factory methods on @scala[`akka.persistence.typed.scaladsl.ReplicatedEventSourcing`]@java[`akka.persistence.typed.javadsl.ReplicatedEventSourcing`].
All replicas need to be known up front:
Scala
: @@snip [ActiveActiveCompileOnlySpec.scala](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ActiveActiveCompileOnlySpec.scala) { #replicas }
: @@snip [ReplicatedEventSourcingCompileOnlySpec.scala](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala) { #replicas }
Java
: @@snip [ActiveActiveCompileOnlyTest.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ActiveActiveCompileOnlyTest.java) { #replicas }
: @@snip [ReplicatedEventSourcingCompileOnlyTest.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlyTest.java) { #replicas }
Then to enable replication create the event sourced behavior with the factory method:
Scala
: @@snip [ActiveActiveCompileOnlySpec.scala](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ActiveActiveCompileOnlySpec.scala) { #factory }
: @@snip [ReplicatedEventSourcingCompileOnlySpec.scala](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala) { #factory }
Java
: @@snip [ActiveActiveCompileOnlyTest.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ActiveActiveCompileOnlyTest.java) { #factory }
: @@snip [ReplicatedEventSourcingCompileOnlyTest.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlyTest.java) { #factory }
The factory takes in:
* EntityID: this will be used as part of the underlying persistenceId
* Replica: Which replica this instance is
* All Replicas and the query plugin used to read their events
* A factory function to create an instance of the @scala[`EventSourcedBehavior`]@java[`ActiveActiveEventSourcedBehavior`]
* A factory function to create an instance of the @scala[`EventSourcedBehavior`]@java[`ReplicatedEventSourcedBehavior`]
In this scenario each replica reads from each other's database effectively providing cross region replication for any database that has an Akka Persistence plugin. Alternatively if all the replicas use the same journal, e.g. for testing or if it is a distributed database such as Cassandra, the `withSharedJournal` factory can be used.
Scala
: @@snip [ActiveActiveCompileOnlySpec.scala](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ActiveActiveCompileOnlySpec.scala) { #factory-shared}
: @@snip [ReplicatedEventSourcingCompileOnlySpec.scala](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala) { #factory-shared}
Java
: @@snip [ActiveActiveCompileOnlyTest.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ActiveActiveCompileOnlyTest.java) { #factory-shared }
: @@snip [ReplicatedEventSourcingCompileOnlyTest.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlyTest.java) { #factory-shared }
@@@ div { .group-scala }
The function passed to both factory methods return an `EventSourcedBehavior` and provide access to the @apidoc[ActiveActiveContext] that has the following methods:
The function passed to both factory methods return an `EventSourcedBehavior` and provide access to the @apidoc[ReplicationContext] that has the following methods:
* entityId
* replicaId
@ -108,8 +108,8 @@ As well as methods that **can only be** used in the event handler. The values th
@@@ div { .group-java }
The function passed to both factory methods is invoked with a special @apidoc[ActiveActiveContext] that needs to be passed to the
concrete `ActiveActiveEventSourcedBehavior` and on to the super constructor.
The function passed to both factory methods is invoked with a special @apidoc[ReplicationContext] that needs to be passed to the
concrete `ReplicatedEventSourcedBehavior` and on to the super constructor.
The context gives access to:
@ -118,7 +118,7 @@ The context gives access to:
* allReplicas
* persistenceId
As well as methods that **can only be** used in the event handler, accessed through `getActiveActiveContext`. The values these methods return relate to the event that is being processed.
As well as methods that **can only be** used in the event handler, accessed through `getReplicationContext`. The values these methods return relate to the event that is being processed.
@@@
@ -190,9 +190,9 @@ There is no built in support for knowing an event has been replicated to all rep
For some use cases you may need to trigger side effects after consuming replicated events. For example when an auction has been closed in
all data centers and all bids have been replicated.
The @api[ActiveActiveContext] contains the current replica, the origin replica for the event processes, and if a recovery is running. These can be used to
The @api[ReplicationContext] contains the current replica, the origin replica for the event processes, and if a recovery is running. These can be used to
implement side effects that take place once events are fully replicated. If the side effect should happen only once then a particular replica can be
designated to do it. The @ref[Auction example](./persistence-active-active-examples.md#auction) uses these techniques.
designated to do it. The @ref[Auction example](./replicated-eventsourcing-examples.md#auction) uses these techniques.
## How it works
@ -203,7 +203,7 @@ You dont have to read this section to be able to use the feature, but to use
Causal delivery order means that events persisted in one data center are read in the same order in other data centers. The order of concurrent events is undefined, which should be no problem
when using [CRDT's](#conflict-free-replicated-data-types)
and otherwise will be detected via the `ActiveActiveContext` concurrent method.
and otherwise will be detected via the `ReplicationContext` concurrent method.
For example:
@ -231,7 +231,7 @@ A third data center may also see the events as either "e1, e3, e2" or as "e1, e2
### Concurrent updates
Active-active automatically tracks causality between events from different replias using [version vectors](https://en.wikipedia.org/wiki/Version_vector).
Replicated Event Sourcing automatically tracks causality between events from different replicas using [version vectors](https://en.wikipedia.org/wiki/Version_vector).
![images/causality.png](images/causality.png)
@ -245,29 +245,29 @@ When comparing two version vectors `v1` and `v2`:
* `v1`is CONCURRENT with `v2` otherwise
## Sharded Active Active entities
## Sharded Replicated Event Sourced entities
To simplify what probably are the most common use cases for how you will want to distribute the active active actors there is a minimal API for running multiple instances of @ref[Akka Cluster Sharding](cluster-sharding.md),
To simplify what probably are the most common use cases for how you will want to distribute the replicated actors there is a minimal API for running multiple instances of @ref[Akka Cluster Sharding](cluster-sharding.md),
each instance holding the entities for a single replica.
The distribution of the replicas can be controlled either through cluster roles or using the @ref[multi datacenter](cluster-dc.md) support in Akka Cluster.
The API consists of bootstrapping logic for starting the sharding instances through @apidoc[ActiveActiveShardingExtension] available from the
The API consists of bootstrapping logic for starting the sharding instances through @apidoc[ReplicatedShardingExtension] available from the
`akka-cluster-sharding-typed` module.
Scala
: @@snip [ActiveActiveShardingSpec.scala](/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ActiveActiveShardingSpec.scala) { #bootstrap }
: @@snip [ReplicatedShardingSpec.scala](/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala) { #bootstrap }
Java
: @@snip [ActiveActiveShardingTest.java](/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ActiveActiveShardingTest.java) { #bootstrap }
: @@snip [ReplicatedShardingTest.java](/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java) { #bootstrap }
`init` returns an @apidoc[ActiveActiveSharding] instance which gives access to @apidoc[EntityRef]s for each of the replicas for arbitrary routing logic:
`init` returns an @apidoc[ReplicatedSharding] instance which gives access to @apidoc[EntityRef]s for each of the replicas for arbitrary routing logic:
Scala
: @@snip [ActiveActiveShardingSpec.scala](/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ActiveActiveShardingSpec.scala) { #all-entity-refs }
: @@snip [ReplicatedShardingSpec.scala](/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala) { #all-entity-refs }
Java
: @@snip [ActiveActiveShardingTest.java](/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ActiveActiveShardingTest.java) { #all-entity-refs }
: @@snip [ReplicatedShardingTest.java](/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java) { #all-entity-refs }
More advanced routing among the replicas is currently left as an exercise for the reader (or may be covered in a future release [#29281](https://github.com/akka/akka/issues/29281), [#29319](https://github.com/akka/akka/issues/29319)).
@ -275,24 +275,24 @@ More advanced routing among the replicas is currently left as an exercise for th
## Direct Replication of Events
Normally an event has to be written in the journal and then picked up by the trailing read journal in the other replicas.
As an optimization the active active events can be published across the Akka cluster to the replicas. The read side
As an optimization the replicated events can be published across the Akka cluster to the replicas. The read side
query is still needed as delivery is not guaranteed, but can be configured to poll the database less often since most
events will arrive at the replicas through the cluster.
To enable this feature you first need to enable event publishing on the @scala[`EventSourcedBehavior`]@java[`ActiveActiveEventSourcedBehavior`] with `withEventPublishing`
and then enable direct replication through `withDirectReplication()` on @apidoc[ActiveActiveShardingSettings] (if not using
active active sharding the replication can be run standalone by starting the @apidoc[ActiveActiveShardingDirectReplication] actor).
To enable this feature you first need to enable event publishing on the @scala[`EventSourcedBehavior`]@java[`ReplicatedEventSourcedBehavior`] with `withEventPublishing`
and then enable direct replication through `withDirectReplication()` on @apidoc[ReplicatedShardingSettings] (if not using
replicated sharding the replication can be run standalone by starting the @apidoc[ShardingDirectReplication] actor).
The "event publishing" feature publishes each event to the local system event bus as a side effect after it has been written,
the `ActiveActiveShardingDirectReplication` actor subscribes to these events and forwards them to the replicas allowing them
the @apidoc[ShardingDirectReplication] actor subscribes to these events and forwards them to the replicas allowing them
to fast forward the stream of events for the origin replica. (With additional potential future support in journals for fast forwarding [#29311](https://github.com/akka/akka/issues/29311)).
## Journal Support
For a journal plugin to support active active it needs to store and read metadata for each event if it is defined in the @apiref[PersistentRepr]
For a journal plugin to support replication it needs to store and read metadata for each event if it is defined in the @apiref[PersistentRepr]
`metadata` field. To attach the metadata after writing it, `PersistentRepr.withMetadata` is used. The @apidoc[JournalSpec] in the Persistence TCK provides
a capability flag `supportsMetadata` to toggle verification that metadata is handled correctly.
For a snapshot plugin to support active active it needs to store and read metadata for the snapshot if it is defined in the @apiref[akka.persistence.SnapshotMetadata] `metadata` field.
For a snapshot plugin to support replication it needs to store and read metadata for the snapshot if it is defined in the @apiref[akka.persistence.SnapshotMetadata] `metadata` field.
To attach the metadata when reading the snapshot the `akka.persistence.SnapshotMetadata.apply` factory overload taking a `metadata` parameter is used.
The @apidoc[SnapshotStoreSpec] in the Persistence TCK provides a capability flag `supportsMetadata` to toggle verification that metadata is handled correctly.

View file

@ -72,7 +72,7 @@ trait SnapshotStoreCapabilityFlags extends CapabilityFlags {
/**
* When `true` enables tests which check if the snapshot store properly stores and
* loads metadata (needed for Active Active) along with the snapshots
* loads metadata (needed for replication) along with the snapshots
*/
protected def supportsMetadata: CapabilityFlag
}

View file

@ -11,7 +11,6 @@ import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.persistence.testkit.PersistenceTestKitPlugin;
import akka.persistence.testkit.javadsl.PersistenceTestKit;
import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal;
import akka.persistence.typed.javadsl.*;
import com.typesafe.config.ConfigFactory;
@ -25,10 +24,10 @@ import java.util.*;
import static akka.Done.done;
import static org.junit.Assert.assertEquals;
public class ActiveActiveTest extends JUnitSuite {
public class ReplicatedEventSourcingTest extends JUnitSuite {
static final class TestBehavior
extends ActiveActiveEventSourcedBehavior<TestBehavior.Command, String, Set<String>> {
extends ReplicatedEventSourcedBehavior<TestBehavior.Command, String, Set<String>> {
interface Command {}
static final class GetState implements Command {
@ -81,7 +80,7 @@ public class ActiveActiveTest extends JUnitSuite {
public static Behavior<Command> create(
String entityId, ReplicaId replicaId, Set<ReplicaId> allReplicas) {
return ActiveActiveEventSourcing.withSharedJournal(
return ReplicatedEventSourcing.withSharedJournal(
entityId,
replicaId,
allReplicas,
@ -89,8 +88,8 @@ public class ActiveActiveTest extends JUnitSuite {
TestBehavior::new);
}
private TestBehavior(ActiveActiveContext activeActiveContext) {
super(activeActiveContext);
private TestBehavior(ReplicationContext replicationContext) {
super(replicationContext);
}
@Override
@ -124,7 +123,7 @@ public class ActiveActiveTest extends JUnitSuite {
(GetReplica cmd) ->
Effect()
.none()
.thenRun(() -> cmd.replyTo.tell(getActiveActiveContext().replicaId())))
.thenRun(() -> cmd.replyTo.tell(getReplicationContext().replicaId())))
.onCommand(Stop.class, __ -> Effect().stop())
.build();
}
@ -153,9 +152,9 @@ public class ActiveActiveTest extends JUnitSuite {
@Rule public final LogCapturing logCapturing = new LogCapturing();
// minimal test, full coverage over in ActiveActiveSpec
// minimal test, full coverage over in ReplicatedEventSourcingSpec
@Test
public void activeActiveReplicationTest() {
public void replicatedEventSourcingReplicationTest() {
ReplicaId dcA = new ReplicaId("DC-A");
ReplicaId dcB = new ReplicaId("DC-B");
ReplicaId dcC = new ReplicaId("DC-C");

View file

@ -9,7 +9,7 @@ import akka.persistence.typed.javadsl.*;
import java.util.*;
public class ActiveActiveCompileOnlyTest {
public class ReplicatedEventSourcingCompileOnlyTest {
// dummy for docs example
interface Command {}
@ -19,11 +19,11 @@ public class ActiveActiveCompileOnlyTest {
interface State {}
static // #factory
final class MyActiceActiveEventSourcedBehavior
extends ActiveActiveEventSourcedBehavior<Command, Event, State> {
final class MyReplicatedEventSourcedBehavior
extends ReplicatedEventSourcedBehavior<Command, Event, State> {
public MyActiceActiveEventSourcedBehavior(ActiveActiveContext activeActiveContext) {
super(activeActiveContext);
public MyReplicatedEventSourcedBehavior(ReplicationContext replicationContext) {
super(replicationContext);
}
// ... implementation of abstract methods ...
// #factory
@ -58,12 +58,12 @@ public class ActiveActiveCompileOnlyTest {
String queryPluginId = "";
// #factory-shared
ActiveActiveEventSourcing.withSharedJournal(
ReplicatedEventSourcing.withSharedJournal(
"entityId",
DCA,
allReplicas,
queryPluginId,
context -> new MyActiceActiveEventSourcedBehavior(context));
context -> new MyReplicatedEventSourcedBehavior(context));
// #factory-shared
// #factory
@ -74,11 +74,11 @@ public class ActiveActiveCompileOnlyTest {
allReplicasAndQueryPlugins.put(DCB, "journalForDCB");
EventSourcedBehavior<Command, Event, State> behavior =
ActiveActiveEventSourcing.create(
ReplicatedEventSourcing.create(
"entityId",
DCA,
allReplicasAndQueryPlugins,
context -> new MyActiceActiveEventSourcedBehavior(context));
context -> new MyReplicatedEventSourcedBehavior(context));
// #factory
}
}

View file

@ -14,7 +14,7 @@ import akka.actor.typed.Behavior
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.scaladsl.CurrentEventsByPersistenceIdQuery
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.typed.scaladsl.ActiveActiveEventSourcing
import akka.persistence.typed.scaladsl.ReplicatedEventSourcing
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.stream.scaladsl.Sink
@ -23,7 +23,7 @@ import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.Eventually
import org.scalatest.wordspec.AnyWordSpecLike
object MultiJournalActiveActiveSpec {
object MultiJournalReplicationSpec {
object Actor {
sealed trait Command
@ -32,7 +32,7 @@ object MultiJournalActiveActiveSpec {
private val writeJournalPerReplica = Map("R1" -> "journal1.journal", "R2" -> "journal2.journal")
def apply(entityId: String, replicaId: String): Behavior[Command] = {
ActiveActiveEventSourcing(
ReplicatedEventSourcing(
entityId,
ReplicaId(replicaId),
Map(ReplicaId("R1") -> "journal1.query", ReplicaId("R2") -> "journal2.query"))(
@ -65,15 +65,15 @@ object MultiJournalActiveActiveSpec {
}
class MultiJournalActiveActiveSpec
extends ScalaTestWithActorTestKit(MultiJournalActiveActiveSpec.separateJournalsConfig)
class MultiJournalReplicationSpec
extends ScalaTestWithActorTestKit(MultiJournalReplicationSpec.separateJournalsConfig)
with AnyWordSpecLike
with LogCapturing
with Eventually {
import MultiJournalActiveActiveSpec._
import MultiJournalReplicationSpec._
val ids = new AtomicInteger(0)
def nextEntityId = s"e-${ids.getAndIncrement()}"
"ActiveActiveEventSourcing" should {
"ReplicatedEventSourcing" should {
"support one journal per replica" in {
val r1 = spawn(Actor("id1", "R1"))

View file

@ -13,14 +13,14 @@ import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.internal.{ ReplicatedPublishedEventMetaData, VersionVector }
import akka.persistence.typed.scaladsl.ActiveActiveEventSourcing
import akka.persistence.typed.scaladsl.ReplicatedEventSourcing
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import org.scalatest.wordspec.AnyWordSpecLike
object ActiveActiveEventPublishingSpec {
object ReplicatedEventPublishingSpec {
object MyActiveActive {
object MyReplicatedBehavior {
trait Command
case class Add(text: String, replyTo: ActorRef[Done]) extends Command
case class Get(replyTo: ActorRef[Set[String]]) extends Command
@ -28,7 +28,7 @@ object ActiveActiveEventPublishingSpec {
def apply(entityId: String, replicaId: ReplicaId, allReplicas: Set[ReplicaId]): Behavior[Command] =
Behaviors.setup { ctx =>
ActiveActiveEventSourcing.withSharedJournal(
ReplicatedEventSourcing.withSharedJournal(
entityId,
replicaId,
allReplicas,
@ -56,7 +56,7 @@ object ActiveActiveEventPublishingSpec {
}
}
class ActiveActiveEventPublishingSpec
class ReplicatedEventPublishingSpec
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
with AnyWordSpecLike
with LogCapturing {
@ -71,14 +71,14 @@ class ActiveActiveEventPublishingSpec
s"myId$idCounter"
}
import ActiveActiveEventPublishingSpec._
import ReplicatedEventPublishingSpec._
"An active active actor" must {
"An Replicated Event Sourced actor" must {
"move forward when a published event from a replica is received" in {
val id = nextEntityId()
val actor = spawn(MyActiveActive(id, DCA, Set(DCA, DCB)))
val actor = spawn(MyReplicatedBehavior(id, DCA, Set(DCA, DCB)))
val probe = createTestProbe[Any]()
actor ! MyActiveActive.Add("one", probe.ref)
actor ! MyReplicatedBehavior.Add("one", probe.ref)
probe.expectMessage(Done)
// simulate a published event from another replica
@ -88,18 +88,18 @@ class ActiveActiveEventPublishingSpec
"two",
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
actor ! MyActiveActive.Add("three", probe.ref)
actor ! MyReplicatedBehavior.Add("three", probe.ref)
probe.expectMessage(Done)
actor ! MyActiveActive.Get(probe.ref)
actor ! MyReplicatedBehavior.Get(probe.ref)
probe.expectMessage(Set("one", "two", "three"))
}
"ignore a published event from a replica is received but the sequence number is unexpected" in {
val id = nextEntityId()
val actor = spawn(MyActiveActive(id, DCA, Set(DCA, DCB)))
val actor = spawn(MyReplicatedBehavior(id, DCA, Set(DCA, DCB)))
val probe = createTestProbe[Any]()
actor ! MyActiveActive.Add("one", probe.ref)
actor ! MyReplicatedBehavior.Add("one", probe.ref)
probe.expectMessage(Done)
// simulate a published event from another replica
@ -109,18 +109,18 @@ class ActiveActiveEventPublishingSpec
"two",
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
actor ! MyActiveActive.Add("three", probe.ref)
actor ! MyReplicatedBehavior.Add("three", probe.ref)
probe.expectMessage(Done)
actor ! MyActiveActive.Get(probe.ref)
actor ! MyReplicatedBehavior.Get(probe.ref)
probe.expectMessage(Set("one", "three"))
}
"ignore a published event from an unknown replica" in {
val id = nextEntityId()
val actor = spawn(MyActiveActive(id, DCA, Set(DCA, DCB)))
val actor = spawn(MyReplicatedBehavior(id, DCA, Set(DCA, DCB)))
val probe = createTestProbe[Any]()
actor ! MyActiveActive.Add("one", probe.ref)
actor ! MyReplicatedBehavior.Add("one", probe.ref)
probe.expectMessage(Done)
// simulate a published event from another replica
@ -130,18 +130,18 @@ class ActiveActiveEventPublishingSpec
"two",
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData(DCC, VersionVector.empty)))
actor ! MyActiveActive.Add("three", probe.ref)
actor ! MyReplicatedBehavior.Add("three", probe.ref)
probe.expectMessage(Done)
actor ! MyActiveActive.Get(probe.ref)
actor ! MyReplicatedBehavior.Get(probe.ref)
probe.expectMessage(Set("one", "three"))
}
"ignore an already seen event from a replica" in {
val id = nextEntityId()
val actor = spawn(MyActiveActive(id, DCA, Set(DCA, DCB)))
val actor = spawn(MyReplicatedBehavior(id, DCA, Set(DCA, DCB)))
val probe = createTestProbe[Any]()
actor ! MyActiveActive.Add("one", probe.ref)
actor ! MyReplicatedBehavior.Add("one", probe.ref)
probe.expectMessage(Done)
// simulate a published event from another replica
@ -159,27 +159,27 @@ class ActiveActiveEventPublishingSpec
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
actor ! MyActiveActive.Add("three", probe.ref)
actor ! MyReplicatedBehavior.Add("three", probe.ref)
probe.expectMessage(Done)
actor ! MyActiveActive.Get(probe.ref)
actor ! MyReplicatedBehavior.Get(probe.ref)
probe.expectMessage(Set("one", "two", "three"))
}
"handle published events after replay" in {
val id = nextEntityId()
val probe = createTestProbe[Any]()
val activeActiveBehavior = MyActiveActive(id, DCA, Set(DCA, DCB))
val incarnation1 = spawn(activeActiveBehavior)
incarnation1 ! MyActiveActive.Add("one", probe.ref)
val replicatedBehavior = MyReplicatedBehavior(id, DCA, Set(DCA, DCB))
val incarnation1 = spawn(replicatedBehavior)
incarnation1 ! MyReplicatedBehavior.Add("one", probe.ref)
probe.expectMessage(Done)
incarnation1 ! MyActiveActive.Stop
incarnation1 ! MyReplicatedBehavior.Stop
probe.expectTerminated(incarnation1)
val incarnation2 = spawn(activeActiveBehavior)
val incarnation2 = spawn(replicatedBehavior)
incarnation2 ! MyActiveActive.Get(probe.ref)
incarnation2 ! MyReplicatedBehavior.Get(probe.ref)
probe.expectMessage(Set("one"))
// replay completed
@ -191,19 +191,19 @@ class ActiveActiveEventPublishingSpec
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
incarnation2 ! MyActiveActive.Add("three", probe.ref)
incarnation2 ! MyReplicatedBehavior.Add("three", probe.ref)
probe.expectMessage(Done)
incarnation2 ! MyActiveActive.Get(probe.ref)
incarnation2 ! MyReplicatedBehavior.Get(probe.ref)
probe.expectMessage(Set("one", "two", "three"))
}
"handle published events before and after replay" in {
val id = nextEntityId()
val probe = createTestProbe[Any]()
val activeActiveBehaviorA = MyActiveActive(id, DCA, Set(DCA, DCB))
val incarnationA1 = spawn(activeActiveBehaviorA)
incarnationA1 ! MyActiveActive.Add("one", probe.ref)
val replicatedBehaviorA = MyReplicatedBehavior(id, DCA, Set(DCA, DCB))
val incarnationA1 = spawn(replicatedBehaviorA)
incarnationA1 ! MyReplicatedBehavior.Add("one", probe.ref)
probe.expectMessage(Done)
// simulate a published event from another replica
@ -214,10 +214,10 @@ class ActiveActiveEventPublishingSpec
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
incarnationA1 ! MyActiveActive.Stop
incarnationA1 ! MyReplicatedBehavior.Stop
probe.expectTerminated(incarnationA1)
val incarnationA2 = spawn(activeActiveBehaviorA)
val incarnationA2 = spawn(replicatedBehaviorA)
// simulate a published event from another replica
incarnationA2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
@ -227,10 +227,10 @@ class ActiveActiveEventPublishingSpec
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
incarnationA2 ! MyActiveActive.Add("four", probe.ref)
incarnationA2 ! MyReplicatedBehavior.Add("four", probe.ref)
probe.expectMessage(Done)
incarnationA2 ! MyActiveActive.Get(probe.ref)
incarnationA2 ! MyReplicatedBehavior.Get(probe.ref)
probe.expectMessage(Set("one", "two", "three", "four"))
}

View file

@ -13,12 +13,12 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.scaladsl.{ ActiveActiveContext, ActiveActiveEventSourcing, Effect, EventSourcedBehavior }
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplicatedEventSourcing, ReplicationContext }
import akka.serialization.jackson.CborSerializable
import org.scalatest.concurrent.Eventually
import org.scalatest.wordspec.AnyWordSpecLike
object ActiveActiveSpec {
object ReplicatedEventSourcingSpec {
val AllReplicas = Set(ReplicaId("R1"), ReplicaId("R2"), ReplicaId("R3"))
@ -35,7 +35,7 @@ object ActiveActiveSpec {
testBehavior(entityId, replicaId, Some(probe))
def eventSourcedBehavior(
aaContext: ActiveActiveContext,
aaContext: ReplicationContext,
probe: Option[ActorRef[EventAndContext]]): EventSourcedBehavior[Command, String, State] = {
EventSourcedBehavior[Command, String, State](
aaContext.persistenceId,
@ -65,7 +65,7 @@ object ActiveActiveSpec {
entityId: String,
replicaId: String,
probe: Option[ActorRef[EventAndContext]] = None): Behavior[Command] =
ActiveActiveEventSourcing.withSharedJournal(
ReplicatedEventSourcing.withSharedJournal(
entityId,
ReplicaId(replicaId),
AllReplicas,
@ -75,15 +75,15 @@ object ActiveActiveSpec {
case class EventAndContext(event: Any, origin: ReplicaId, recoveryRunning: Boolean, concurrent: Boolean)
class ActiveActiveSpec
class ReplicatedEventSourcingSpec
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
with AnyWordSpecLike
with LogCapturing
with Eventually {
import ActiveActiveSpec._
import ReplicatedEventSourcingSpec._
val ids = new AtomicInteger(0)
def nextEntityId = s"e-${ids.getAndIncrement()}"
"ActiveActiveEventSourcing" should {
"ReplicatedEventSourcing" should {
"replicate events between entities" in {
val entityId = nextEntityId
val probe = createTestProbe[Done]()

View file

@ -11,13 +11,13 @@ import akka.persistence.testkit.{ PersistenceTestKitPlugin, PersistenceTestKitSn
import org.scalatest.concurrent.Eventually
import org.scalatest.wordspec.AnyWordSpecLike
object ActiveActiveBaseSpec {
object ReplicationBaseSpec {
val R1 = ReplicaId("R1")
val R2 = ReplicaId("R2")
val AllReplicas = Set(R1, R2)
}
abstract class ActiveActiveBaseSpec
abstract class ReplicationBaseSpec
extends ScalaTestWithActorTestKit(
PersistenceTestKitPlugin.config.withFallback(PersistenceTestKitSnapshotPlugin.config))
with AnyWordSpecLike

View file

@ -8,12 +8,12 @@ import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestK
import akka.actor.typed.{ ActorRef, Behavior }
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.scaladsl.{ ActiveActiveEventSourcing, Effect, EventSourcedBehavior }
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplicatedEventSourcing }
import akka.serialization.jackson.CborSerializable
import org.scalatest.concurrent.Eventually
import org.scalatest.wordspec.AnyWordSpecLike
object ActiveActiveIllegalAccessSpec {
object ReplicationIllegalAccessSpec {
val R1 = ReplicaId("R1")
val R2 = ReplicaId("R1")
@ -28,11 +28,7 @@ object ActiveActiveIllegalAccessSpec {
case class State(all: List[String]) extends CborSerializable
def apply(entityId: String, replica: ReplicaId): Behavior[Command] = {
ActiveActiveEventSourcing.withSharedJournal(
entityId,
replica,
AllReplicas,
PersistenceTestKitReadJournal.Identifier)(
ReplicatedEventSourcing.withSharedJournal(entityId, replica, AllReplicas, PersistenceTestKitReadJournal.Identifier)(
aaContext =>
EventSourcedBehavior[Command, String, State](
aaContext.persistenceId,
@ -66,30 +62,30 @@ object ActiveActiveIllegalAccessSpec {
}
class ActiveActiveIllegalAccessSpec
class ReplicationIllegalAccessSpec
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
with AnyWordSpecLike
with LogCapturing
with Eventually {
import ActiveActiveIllegalAccessSpec._
"ActiveActive" should {
import ReplicationIllegalAccessSpec._
"ReplicatedEventSourcing" should {
"detect illegal access to context in command handler" in {
val probe = createTestProbe[Thrown]()
val ref = spawn(ActiveActiveIllegalAccessSpec("id1", R1))
val ref = spawn(ReplicationIllegalAccessSpec("id1", R1))
ref ! AccessInCommandHandler(probe.ref)
val thrown: Throwable = probe.expectMessageType[Thrown].exception.get
thrown.getMessage should include("from the event handler")
}
"detect illegal access to context in persist thenRun" in {
val probe = createTestProbe[Thrown]()
val ref = spawn(ActiveActiveIllegalAccessSpec("id1", R1))
val ref = spawn(ReplicationIllegalAccessSpec("id1", R1))
ref ! AccessInPersistCallback(probe.ref)
val thrown: Throwable = probe.expectMessageType[Thrown].exception.get
thrown.getMessage should include("from the event handler")
}
"detect illegal access in the factory" in {
val exception = intercept[UnsupportedOperationException] {
ActiveActiveEventSourcing.withSharedJournal("id2", R1, AllReplicas, PersistenceTestKitReadJournal.Identifier) {
ReplicatedEventSourcing.withSharedJournal("id2", R1, AllReplicas, PersistenceTestKitReadJournal.Identifier) {
aaContext =>
aaContext.origin
???

View file

@ -12,13 +12,13 @@ import akka.persistence.testkit.{ PersistenceTestKitPlugin, PersistenceTestKitSn
import akka.persistence.testkit.scaladsl.{ PersistenceTestKit, SnapshotTestKit }
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.internal.{ ReplicatedPublishedEventMetaData, VersionVector }
import akka.persistence.typed.scaladsl.ActiveActiveEventSourcing
import akka.persistence.typed.scaladsl.ReplicatedEventSourcing
import org.scalatest.concurrent.Eventually
import org.scalatest.wordspec.AnyWordSpecLike
object ActiveActiveSnapshotSpec {
object ReplicationSnapshotSpec {
import ActiveActiveSpec._
import ReplicatedEventSourcingSpec._
def behaviorWithSnapshotting(entityId: String, replicaId: ReplicaId): Behavior[Command] =
behaviorWithSnapshotting(entityId, replicaId, None)
@ -33,7 +33,7 @@ object ActiveActiveSnapshotSpec {
entityId: String,
replicaId: ReplicaId,
probe: Option[ActorRef[EventAndContext]]): Behavior[Command] = {
ActiveActiveEventSourcing.withSharedJournal(
ReplicatedEventSourcing.withSharedJournal(
entityId,
replicaId,
AllReplicas,
@ -43,14 +43,14 @@ object ActiveActiveSnapshotSpec {
}
}
class ActiveActiveSnapshotSpec
class ReplicationSnapshotSpec
extends ScalaTestWithActorTestKit(
PersistenceTestKitPlugin.config.withFallback(PersistenceTestKitSnapshotPlugin.config))
with AnyWordSpecLike
with LogCapturing
with Eventually {
import ActiveActiveSpec._
import ActiveActiveSnapshotSpec._
import ReplicatedEventSourcingSpec._
import ReplicationSnapshotSpec._
val ids = new AtomicInteger(0)
def nextEntityId = s"e-${ids.getAndIncrement()}"
@ -61,7 +61,7 @@ class ActiveActiveSnapshotSpec
val R1 = ReplicaId("R1")
val R2 = ReplicaId("R2")
"ActiveActive" should {
"ReplicatedEventSourcing" should {
"recover state from snapshots" in {
val entityId = nextEntityId
val persistenceIdR1 = s"$entityId|R1"

View file

@ -8,8 +8,8 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.crdt.CounterSpec.PlainCounter.{ Decrement, Get, Increment }
import akka.persistence.typed.scaladsl.{ ActiveActiveEventSourcing, Effect, EventSourcedBehavior }
import akka.persistence.typed.{ ActiveActiveBaseSpec, ReplicaId }
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplicatedEventSourcing }
import akka.persistence.typed.{ ReplicaId, ReplicationBaseSpec }
object CounterSpec {
@ -20,7 +20,7 @@ object CounterSpec {
case object Decrement extends Command
}
import ActiveActiveBaseSpec._
import ReplicationBaseSpec._
def apply(
entityId: String,
@ -28,7 +28,7 @@ object CounterSpec {
snapshotEvery: Long = 100,
eventProbe: Option[ActorRef[Counter.Updated]] = None) =
Behaviors.setup[PlainCounter.Command] { context =>
ActiveActiveEventSourcing.withSharedJournal(
ReplicatedEventSourcing.withSharedJournal(
entityId,
replicaId,
AllReplicas,
@ -58,12 +58,12 @@ object CounterSpec {
}
}
class CounterSpec extends ActiveActiveBaseSpec {
class CounterSpec extends ReplicationBaseSpec {
import CounterSpec._
import ActiveActiveBaseSpec._
import ReplicationBaseSpec._
"Active active entity using CRDT counter" should {
"Replicated entity using CRDT counter" should {
"replicate" in {
val id = nextEntityId
val r1 = spawn(apply(id, R1))

View file

@ -6,13 +6,13 @@ package akka.persistence.typed.crdt
import akka.actor.typed.{ ActorRef, Behavior }
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.scaladsl.{ ActiveActiveEventSourcing, Effect, EventSourcedBehavior }
import akka.persistence.typed.{ ActiveActiveBaseSpec, ReplicaId }
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplicatedEventSourcing }
import akka.persistence.typed.{ ReplicaId, ReplicationBaseSpec }
import akka.serialization.jackson.CborSerializable
object LwwSpec {
import ActiveActiveBaseSpec._
import ReplicationBaseSpec._
sealed trait Command
final case class Update(item: String, timestamp: Long, error: ActorRef[String]) extends Command
@ -26,7 +26,7 @@ object LwwSpec {
object LwwRegistry {
def apply(entityId: String, replica: ReplicaId): Behavior[Command] = {
ActiveActiveEventSourcing.withSharedJournal(
ReplicatedEventSourcing.withSharedJournal(
entityId,
replica,
AllReplicas,
@ -59,9 +59,9 @@ object LwwSpec {
}
}
class LwwSpec extends ActiveActiveBaseSpec {
class LwwSpec extends ReplicationBaseSpec {
import LwwSpec._
import ActiveActiveBaseSpec._
import ReplicationBaseSpec._
class Setup {
val entityId = nextEntityId
@ -73,7 +73,7 @@ class LwwSpec extends ActiveActiveBaseSpec {
val r2GetProbe = createTestProbe[Registry]()
}
"Lww Active Active Event Sourced Behavior" should {
"Lww Replicated Event Sourced Behavior" should {
"replicate a single event" in new Setup {
r1 ! Update("a1", 1L, r1Probe.ref)
eventually {

View file

@ -6,17 +6,17 @@ package akka.persistence.typed.crdt
import akka.actor.typed.{ ActorRef, Behavior }
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.scaladsl.{ ActiveActiveEventSourcing, Effect, EventSourcedBehavior }
import akka.persistence.typed.{ ActiveActiveBaseSpec, ReplicaId }
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplicatedEventSourcing }
import akka.persistence.typed.{ ReplicaId, ReplicationBaseSpec }
import ORSetSpec.ORSetEntity._
import akka.persistence.typed.ActiveActiveBaseSpec.{ R1, R2 }
import akka.persistence.typed.ReplicationBaseSpec.{ R1, R2 }
import akka.persistence.typed.crdt.ORSetSpec.ORSetEntity
import scala.util.Random
object ORSetSpec {
import ActiveActiveBaseSpec._
import ReplicationBaseSpec._
object ORSetEntity {
sealed trait Command
@ -27,7 +27,7 @@ object ORSetSpec {
def apply(entityId: String, replica: ReplicaId): Behavior[ORSetEntity.Command] = {
ActiveActiveEventSourcing.withSharedJournal(
ReplicatedEventSourcing.withSharedJournal(
entityId,
replica,
AllReplicas,
@ -54,7 +54,7 @@ object ORSetSpec {
}
class ORSetSpec extends ActiveActiveBaseSpec {
class ORSetSpec extends ReplicationBaseSpec {
class Setup {
val entityId = nextEntityId
@ -78,7 +78,7 @@ class ORSetSpec extends ActiveActiveBaseSpec {
Thread.sleep(Random.nextInt(200).toLong)
}
"ORSet Active Active Entity" should {
"ORSet Replicated Entity" should {
"support concurrent updates" in new Setup {
r1 ! Add("a1")

View file

@ -12,13 +12,13 @@ import akka.actor.typed.{ ActorRef, Behavior }
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.scaladsl.{ ActiveActiveContext, ActiveActiveEventSourcing, Effect, EventSourcedBehavior }
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplicatedEventSourcing, ReplicationContext }
import akka.serialization.jackson.CborSerializable
import org.scalatest.concurrent.{ Eventually, ScalaFutures }
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
object AAAuctionExampleSpec {
object ReplicatedAuctionExampleSpec {
type MoneyAmount = Int
@ -104,7 +104,7 @@ object AAAuctionExampleSpec {
//#setup
//#command-handler
def commandHandler(setup: AuctionSetup, ctx: ActorContext[AuctionCommand], aaContext: ActiveActiveContext)(
def commandHandler(setup: AuctionSetup, ctx: ActorContext[AuctionCommand], aaContext: ReplicationContext)(
state: AuctionState,
command: AuctionCommand): Effect[AuctionEvent, AuctionState] = {
state.phase match {
@ -166,7 +166,7 @@ object AAAuctionExampleSpec {
}
//#event-handler
def eventHandler(ctx: ActorContext[AuctionCommand], aaCtx: ActiveActiveContext, setup: AuctionSetup)(
def eventHandler(ctx: ActorContext[AuctionCommand], aaCtx: ReplicationContext, setup: AuctionSetup)(
state: AuctionState,
event: AuctionEvent): AuctionState = {
@ -184,7 +184,7 @@ object AAAuctionExampleSpec {
private def eventTriggers(
setup: AuctionSetup,
ctx: ActorContext[AuctionCommand],
aaCtx: ActiveActiveContext,
aaCtx: ReplicationContext,
event: AuctionEvent,
newState: AuctionState) = {
event match {
@ -214,7 +214,7 @@ object AAAuctionExampleSpec {
def behavior(replica: ReplicaId, setup: AuctionSetup): Behavior[AuctionCommand] = Behaviors.setup[AuctionCommand] {
ctx =>
ActiveActiveEventSourcing
ReplicatedEventSourcing
.withSharedJournal(setup.name, replica, setup.allReplicas, PersistenceTestKitReadJournal.Identifier) { aaCtx =>
EventSourcedBehavior(
aaCtx.persistenceId,
@ -225,14 +225,14 @@ object AAAuctionExampleSpec {
}
}
class AAAuctionExampleSpec
class ReplicatedAuctionExampleSpec
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
with AnyWordSpecLike
with Matchers
with LogCapturing
with ScalaFutures
with Eventually {
import AAAuctionExampleSpec._
import ReplicatedAuctionExampleSpec._
"Auction example" should {

View file

@ -19,7 +19,7 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.time.{ Millis, Span }
import org.scalatest.wordspec.AnyWordSpecLike
object AABlogExampleSpec {
object ReplicatedBlogExampleSpec {
final case class BlogState(content: Option[PostContent], contentTimestamp: LwwTime, published: Boolean) {
def withContent(newContent: PostContent, timestamp: LwwTime): BlogState =
@ -44,18 +44,18 @@ object AABlogExampleSpec {
final case class BodyChanged(postId: String, newContent: PostContent, timestamp: LwwTime) extends BlogEvent
}
class AABlogExampleSpec
class ReplicatedBlogExampleSpec
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
with AnyWordSpecLike
with Matchers
with LogCapturing
with ScalaFutures
with Eventually {
import AABlogExampleSpec._
import ReplicatedBlogExampleSpec._
implicit val config: PatienceConfig = PatienceConfig(timeout = Span(timeout.duration.toMillis, Millis))
def behavior(aa: ActiveActiveContext, ctx: ActorContext[BlogCommand]) =
def behavior(aa: ReplicationContext, ctx: ActorContext[BlogCommand]) =
EventSourcedBehavior[BlogCommand, BlogEvent, BlogState](
aa.persistenceId,
emptyState,
@ -114,11 +114,11 @@ class AABlogExampleSpec
val refDcA: ActorRef[BlogCommand] =
spawn(
Behaviors.setup[BlogCommand] { ctx =>
ActiveActiveEventSourcing.withSharedJournal(
ReplicatedEventSourcing.withSharedJournal(
"cat",
ReplicaId("DC-A"),
Set(ReplicaId("DC-A"), ReplicaId("DC-B")),
PersistenceTestKitReadJournal.Identifier) { (aa: ActiveActiveContext) =>
PersistenceTestKitReadJournal.Identifier) { (aa: ReplicationContext) =>
behavior(aa, ctx)
}
},
@ -127,11 +127,11 @@ class AABlogExampleSpec
val refDcB: ActorRef[BlogCommand] =
spawn(
Behaviors.setup[BlogCommand] { ctx =>
ActiveActiveEventSourcing.withSharedJournal(
ReplicatedEventSourcing.withSharedJournal(
"cat",
ReplicaId("DC-B"),
Set(ReplicaId("DC-A"), ReplicaId("DC-B")),
PersistenceTestKitReadJournal.Identifier) { (aa: ActiveActiveContext) =>
PersistenceTestKitReadJournal.Identifier) { (aa: ReplicationContext) =>
behavior(aa, ctx)
}
},

View file

@ -5,11 +5,11 @@
package docs.akka.persistence.typed
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.scaladsl.{ActiveActiveEventSourcing, EventSourcedBehavior}
import akka.persistence.typed.scaladsl.{ EventSourcedBehavior, ReplicatedEventSourcing }
import com.github.ghik.silencer.silent
@silent("never used")
object ActiveActiveCompileOnlySpec {
object ReplicatedEventSourcingCompileOnlySpec {
//#replicas
val DCA = ReplicaId("DC-A")
@ -24,13 +24,13 @@ object ActiveActiveCompileOnlySpec {
trait Event
//#factory-shared
ActiveActiveEventSourcing.withSharedJournal("entityId", DCA, AllReplicas, queryPluginId) { context =>
ReplicatedEventSourcing.withSharedJournal("entityId", DCA, AllReplicas, queryPluginId) { context =>
EventSourcedBehavior[Command, State, Event](???, ???, ???, ???)
}
//#factory-shared
//#factory
ActiveActiveEventSourcing("entityId", DCA, Map(DCA -> "journalForDCA", DCB -> "journalForDCB")) { context =>
ReplicatedEventSourcing("entityId", DCA, Map(DCA -> "journalForDCA", DCB -> "journalForDCB")) { context =>
EventSourcedBehavior[Command, State, Event](???, ???, ???, ???)
}
//#factory

View file

@ -1,4 +1,4 @@
# Changes to internal/private/do not extend
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withActiveActive")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withReplication")
ProblemFilters.exclude[Problem]("akka.persistence.typed.internal.*")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withEventPublishing")

View file

@ -1,8 +1,8 @@
akka.actor {
serialization-identifiers."akka.persistence.typed.serialization.ActiveActiveSerializer" = 40
serialization-identifiers."akka.persistence.typed.serialization.ReplicatedEventSourcingSerializer" = 40
serializers.active-active = "akka.persistence.typed.serialization.ActiveActiveSerializer"
serializers.active-active = "akka.persistence.typed.serialization.ReplicatedEventSourcingSerializer"
serialization-bindings {
"akka.persistence.typed.internal.VersionVector" = active-active

View file

@ -17,10 +17,10 @@ import akka.persistence.typed.internal.ReplicatedPublishedEventMetaData
@DoNotInherit
trait PublishedEvent {
/** Scala API: When emitted from an Active Active actor this will contain the replica id */
/** Scala API: When emitted from an Replicated Event Sourcing actor this will contain the replica id */
def replicatedMetaData: Option[ReplicatedPublishedEventMetaData]
/** Java API: When emitted from an Active Active actor this will contain the replica id */
/** Java API: When emitted from an Replicated Event Sourcing actor this will contain the replica id */
def getReplicatedMetaData: Optional[ReplicatedPublishedEventMetaData]
def persistenceId: PersistenceId

View file

@ -5,6 +5,6 @@
package akka.persistence.typed
/**
* Identifies a replica in Active Active eventsourcing, could be a datacenter name or a logical identifier.
* Identifies a replica in Replicated Event Sourcing, could be a datacenter name or a logical identifier.
*/
final case class ReplicaId(id: String)

View file

@ -13,7 +13,6 @@ import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.InternalApi
import akka.persistence._
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.scaladsl.EventSourcedBehavior.ActiveActive
import akka.persistence.typed.{ EventAdapter, PersistenceId, SnapshotAdapter }
import akka.persistence.typed.scaladsl.{ EventSourcedBehavior, RetentionCriteria }
import akka.util.OptionVal
@ -49,7 +48,7 @@ private[akka] final class BehaviorSetup[C, E, S](
var holdingRecoveryPermit: Boolean,
val settings: EventSourcedSettings,
val stashState: StashState,
val activeActive: Option[ActiveActive],
val replication: Option[ReplicationSetup],
val publishEvents: Boolean) {
import BehaviorSetup._
@ -62,7 +61,7 @@ private[akka] final class BehaviorSetup[C, E, S](
val journal: ClassicActorRef = persistence.journalFor(settings.journalPluginId)
val snapshotStore: ClassicActorRef = persistence.snapshotStoreFor(settings.snapshotPluginId)
val replicaId: Option[ReplicaId] = activeActive.map(_.replicaId)
val replicaId: Option[ReplicaId] = replication.map(_.replicaId)
def selfClassic: ClassicActorRef = context.self.toClassic

View file

@ -40,7 +40,6 @@ import akka.persistence.typed.SnapshotAdapter
import akka.persistence.typed.SnapshotCompleted
import akka.persistence.typed.SnapshotFailed
import akka.persistence.typed.SnapshotSelectionCriteria
import akka.persistence.typed.scaladsl.EventSourcedBehavior.ActiveActive
import akka.persistence.typed.scaladsl._
import akka.persistence.typed.scaladsl.{ Recovery => TypedRecovery }
import akka.persistence.typed.scaladsl.RetentionCriteria
@ -93,7 +92,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
retention: RetentionCriteria = RetentionCriteria.disabled,
supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop,
override val signalHandler: PartialFunction[(State, Signal), Unit] = PartialFunction.empty,
activeActive: Option[ActiveActive] = None,
replication: Option[ReplicationSetup] = None,
publishEvents: Boolean = false)
extends EventSourcedBehavior[Command, Event, State] {
@ -158,7 +157,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
holdingRecoveryPermit = false,
settings = settings,
stashState = stashState,
activeActive = activeActive,
replication = replication,
publishEvents = publishEvents)
// needs to accept Any since we also can get messages from the journal
@ -251,9 +250,9 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
copy(publishEvents = true)
}
override private[akka] def withActiveActive(
context: ActiveActiveContextImpl): EventSourcedBehavior[Command, Event, State] = {
copy(activeActive = Some(ActiveActive(context.replicaId, context.replicasAndQueryPlugins, context)))
override private[akka] def withReplication(
context: ReplicationContextImpl): EventSourcedBehavior[Command, Event, State] = {
copy(replication = Some(ReplicationSetup(context.replicaId, context.replicasAndQueryPlugins, context)))
}
}
@ -274,7 +273,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
object ReplicatedEventMetadata {
/**
* For a journal supporting active active needing to add test coverage, use this instance as metadata and defer
* For a journal supporting Replicated Event Sourcing needing to add test coverage, use this instance as metadata and defer
* to the built in serializer for serialization format
*/
@ApiMayChange
@ -297,7 +296,7 @@ private[akka] final case class ReplicatedEventMetadata(
object ReplicatedSnapshotMetadata {
/**
* For a snapshot store supporting active active needing to add test coverage, use this instance as metadata and defer
* For a snapshot store supporting Replicated Event Sourcing needing to add test coverage, use this instance as metadata and defer
* to the built in serializer for serialization format
*/
@ApiMayChange

View file

@ -191,7 +191,7 @@ private[akka] trait SnapshotInteractions[C, E, S] {
if (state.state == null)
throw new IllegalStateException("A snapshot must not be a null state.")
else {
val meta = setup.activeActive match {
val meta = setup.replication match {
case Some(_) =>
val m = ReplicatedSnapshotMetadata(state.version, state.seenPerReplica)
Some(m)

View file

@ -23,7 +23,6 @@ import akka.persistence.typed.SingleEventSeq
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetState
import akka.persistence.typed.internal.ReplayingEvents.ReplayingState
import akka.persistence.typed.internal.Running.WithSeqNrAccessible
import akka.persistence.typed.scaladsl.EventSourcedBehavior.ActiveActive
import akka.util.OptionVal
import akka.util.PrettyDuration._
import akka.util.unused
@ -123,14 +122,14 @@ private[akka] final class ReplayingEvents[C, E, S](
eventForErrorReporting = OptionVal.Some(event)
state = state.copy(seqNr = repr.sequenceNr)
val aaMetaAndSelfReplica: Option[(ReplicatedEventMetadata, ReplicaId, ActiveActive)] =
setup.activeActive match {
val aaMetaAndSelfReplica: Option[(ReplicatedEventMetadata, ReplicaId, ReplicationSetup)] =
setup.replication match {
case Some(aa) =>
val meta = repr.metadata match {
case Some(m) => m.asInstanceOf[ReplicatedEventMetadata]
case None =>
throw new IllegalStateException(
s"Active active enabled but existing event has no metadata. Migration isn't supported yet.")
s"Replicated Event Sourcing enabled but existing event has no metadata. Migration isn't supported yet.")
}
aa.setContext(recoveryRunning = true, meta.originReplica, meta.concurrent)
@ -140,7 +139,7 @@ private[akka] final class ReplayingEvents[C, E, S](
val newState = setup.eventHandler(state.state, event)
setup.activeActive match {
setup.replication match {
case Some(aa) =>
aa.clearContext()
case None =>

View file

@ -0,0 +1,111 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.internal
import akka.annotation.InternalApi
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.ReplicaId
import akka.util.OptionVal
import akka.util.WallClock
import akka.util.ccompat.JavaConverters._
/**
* INTERNAL API
*/
// FIXME, parts of this can be set during initialisation
// Other fields will be set before executing the event handler as they change per event
// https://github.com/akka/akka/issues/29258
@InternalApi
private[akka] final class ReplicationContextImpl(
val entityId: String,
val replicaId: ReplicaId,
val replicasAndQueryPlugins: Map[ReplicaId, String])
extends akka.persistence.typed.scaladsl.ReplicationContext
with akka.persistence.typed.javadsl.ReplicationContext {
val allReplicas: Set[ReplicaId] = replicasAndQueryPlugins.keySet
// these are not volatile as they are set on the same thread as they should be accessed
var _currentThread: OptionVal[Thread] = OptionVal.None
var _origin: OptionVal[ReplicaId] = OptionVal.None
var _recoveryRunning: Boolean = false
var _concurrent: Boolean = false
private def checkAccess(functionName: String): Unit = {
val callerThread = Thread.currentThread()
def error() =
throw new UnsupportedOperationException(
s"Unsupported access to ReplicationContext operation from the outside of event handler. " +
s"$functionName can only be called from the event handler")
_currentThread match {
case OptionVal.Some(t) =>
if (callerThread ne t) error()
case OptionVal.None =>
error()
}
}
/**
* The origin of the current event.
* Undefined result if called from anywhere other than an event handler.
*/
override def origin: ReplicaId = {
checkAccess("origin")
_origin match {
case OptionVal.Some(origin) => origin
case OptionVal.None => throw new IllegalStateException("origin can only be accessed from the event handler")
}
}
/**
* Whether the happened concurrently with an event from another replica.
* Undefined result if called from any where other than an event handler.
*/
override def concurrent: Boolean = {
checkAccess("concurrent")
_concurrent
}
override def persistenceId: PersistenceId = PersistenceId.replicatedUniqueId(entityId, replicaId)
override def currentTimeMillis(): Long = {
WallClock.AlwaysIncreasingClock.currentTimeMillis()
}
override def recoveryRunning: Boolean = {
checkAccess("recoveryRunning")
_recoveryRunning
}
override def getAllReplicas: java.util.Set[ReplicaId] = allReplicas.asJava
}
/**
* INTERNAL API
*/
@InternalApi
private[akka] final case class ReplicationSetup(
replicaId: ReplicaId,
allReplicasAndQueryPlugins: Map[ReplicaId, String],
aaContext: ReplicationContextImpl) {
val allReplicas: Set[ReplicaId] = allReplicasAndQueryPlugins.keySet
/**
* Must only be called on the same thread that will execute the user code
*/
def setContext(recoveryRunning: Boolean, originReplica: ReplicaId, concurrent: Boolean): Unit = {
aaContext._currentThread = OptionVal.Some(Thread.currentThread())
aaContext._recoveryRunning = recoveryRunning
aaContext._concurrent = concurrent
aaContext._origin = OptionVal.Some(originReplica)
}
def clearContext(): Unit = {
aaContext._currentThread = OptionVal.None
aaContext._recoveryRunning = false
aaContext._concurrent = false
aaContext._origin = OptionVal.None
}
}

View file

@ -53,7 +53,6 @@ import akka.persistence.typed.internal.InternalProtocol.ReplicatedEventEnvelope
import akka.persistence.typed.internal.JournalInteractions.EventToPersist
import akka.persistence.typed.internal.Running.WithSeqNrAccessible
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior.ActiveActive
import akka.stream.scaladsl.Keep
import akka.stream.SystemMaterializer
import akka.stream.WatchedActorTerminatedException
@ -111,7 +110,7 @@ private[akka] object Running {
def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: RunningState[S]): Behavior[InternalProtocol] = {
val running = new Running(setup.setMdcPhase(PersistenceMdc.RunningCmds))
val initialState = setup.activeActive match {
val initialState = setup.replication match {
case Some(aa) => startReplicationStream(setup, state, aa)
case None => state
}
@ -121,17 +120,17 @@ private[akka] object Running {
def startReplicationStream[C, E, S](
setup: BehaviorSetup[C, E, S],
state: RunningState[S],
aa: ActiveActive): RunningState[S] = {
replicationSetup: ReplicationSetup): RunningState[S] = {
import scala.concurrent.duration._
val system = setup.context.system
val ref = setup.context.self
val query = PersistenceQuery(system)
aa.allReplicas.foldLeft(state) { (state, replicaId) =>
if (replicaId != aa.replicaId) {
replicationSetup.allReplicas.foldLeft(state) { (state, replicaId) =>
if (replicaId != replicationSetup.replicaId) {
val seqNr = state.seenPerReplica(replicaId)
val pid = PersistenceId.replicatedUniqueId(aa.aaContext.entityId, replicaId)
val queryPluginId = aa.allReplicasAndQueryPlugins(replicaId)
val pid = PersistenceId.replicatedUniqueId(replicationSetup.aaContext.entityId, replicaId)
val queryPluginId = replicationSetup.allReplicasAndQueryPlugins(replicaId)
val replication = query.readJournalFor[EventsByPersistenceIdQuery](queryPluginId)
implicit val timeout = Timeout(30.seconds)
@ -152,7 +151,7 @@ private[akka] object Running {
s"Replication stream from replica ${replicaId} for ${setup.persistenceId} contains event " +
s"(sequence nr ${event.sequenceNr}) without replication metadata. " +
s"Is the persistence id used by a regular event sourced actor there or the journal for that replica (${queryPluginId}) " +
"used that does not support active active?")
"used that does not support Replicated Event Sourcing?")
})
.viaMat(new FastForwardingFilter)(Keep.right)
.mapMaterializedValue(streamControl => controlRef.set(streamControl))
@ -240,7 +239,7 @@ private[akka] object Running {
def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match {
case IncomingCommand(c: C @unchecked) => onCommand(state, c)
case re: ReplicatedEventEnvelope[E @unchecked] => onReplicatedEvent(state, re, setup.activeActive.get)
case re: ReplicatedEventEnvelope[E @unchecked] => onReplicatedEvent(state, re, setup.replication.get)
case pe: PublishedEventImpl => onPublishedEvent(state, pe)
case JournalResponse(r) => onDeleteEventsJournalResponse(r, state.state)
case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, state.state)
@ -267,19 +266,19 @@ private[akka] object Running {
def onReplicatedEvent(
state: Running.RunningState[S],
envelope: ReplicatedEventEnvelope[E],
activeActive: ActiveActive): Behavior[InternalProtocol] = {
replication: ReplicationSetup): Behavior[InternalProtocol] = {
setup.log.infoN(
"Replica {} received replicated event. Replica seqs nrs: {}. Envelope {}",
setup.activeActive,
setup.replication,
state.seenPerReplica,
envelope)
envelope.ack ! ReplicatedEventAck
if (envelope.event.originReplica != activeActive.replicaId && !alreadySeen(envelope.event)) {
if (envelope.event.originReplica != replication.replicaId && !alreadySeen(envelope.event)) {
setup.log.debug(
"Saving event [{}] from [{}] as first time",
envelope.event.originSequenceNr,
envelope.event.originReplica)
handleExternalReplicatedEventPersist(activeActive, envelope.event)
handleExternalReplicatedEventPersist(replication, envelope.event)
} else {
setup.log.debug(
"Filtering event [{}] from [{}] as it was already seen",
@ -290,19 +289,20 @@ private[akka] object Running {
}
def onPublishedEvent(state: Running.RunningState[S], event: PublishedEventImpl): Behavior[InternalProtocol] = {
val newBehavior: Behavior[InternalProtocol] = setup.activeActive match {
val newBehavior: Behavior[InternalProtocol] = setup.replication match {
case None =>
setup.log
.warn("Received published event for [{}] but not an active active actor, dropping", event.persistenceId)
setup.log.warn(
"Received published event for [{}] but not an Replicated Event Sourcing actor, dropping",
event.persistenceId)
this
case Some(activeActive) =>
case Some(replication) =>
event.replicatedMetaData match {
case None =>
setup.log.warn("Received published event for [{}] but with no replicated metadata, dropping")
this
case Some(replicatedEventMetaData) =>
onPublishedEvent(state, activeActive, replicatedEventMetaData, event)
onPublishedEvent(state, replication, replicatedEventMetaData, event)
}
}
tryUnstashOne(newBehavior)
@ -310,7 +310,7 @@ private[akka] object Running {
private def onPublishedEvent(
state: Running.RunningState[S],
activeActive: ActiveActive,
replication: ReplicationSetup,
replicatedMetadata: ReplicatedPublishedEventMetaData,
event: PublishedEventImpl): Behavior[InternalProtocol] = {
val log = setup.log
@ -320,18 +320,18 @@ private[akka] object Running {
if (!setup.persistenceId.id.startsWith(idPrefix)) {
log.warn("Ignoring published replicated event for the wrong actor [{}]", event.persistenceId)
this
} else if (originReplicaId == activeActive.replicaId) {
} else if (originReplicaId == replication.replicaId) {
if (log.isDebugEnabled)
log.debug(
"Ignoring published replicated event with seqNr [{}] from our own replica id [{}]",
event.sequenceNumber,
originReplicaId)
this
} else if (!activeActive.allReplicas.contains(originReplicaId)) {
} else if (!replication.allReplicas.contains(originReplicaId)) {
log.warnN(
"Received published replicated event from replica [{}], which is unknown. Active active must be set up with a list of all replicas (known are [{}]).",
"Received published replicated event from replica [{}], which is unknown. Replicated Event Sourcing must be set up with a list of all replicas (known are [{}]).",
originReplicaId,
activeActive.allReplicas.mkString(", "))
replication.allReplicas.mkString(", "))
this
} else {
val expectedSequenceNumber = state.seenPerReplica(originReplicaId) + 1
@ -369,7 +369,7 @@ private[akka] object Running {
state.replicationControl.get(originReplicaId).foreach(_.fastForward(event.sequenceNumber))
handleExternalReplicatedEventPersist(
activeActive,
replication,
ReplicatedEvent(
event.event.asInstanceOf[E],
originReplicaId,
@ -386,15 +386,15 @@ private[akka] object Running {
this
}
def withContext[A](aa: ActiveActive, withActiveActive: ActiveActive => Unit, f: () => A): A = {
withActiveActive(aa)
def withContext[A](aa: ReplicationSetup, withReplication: ReplicationSetup => Unit, f: () => A): A = {
withReplication(aa)
val result = f()
aa.clearContext()
result
}
private def handleExternalReplicatedEventPersist(
activeActive: ActiveActive,
replication: ReplicationSetup,
event: ReplicatedEvent[E]): Behavior[InternalProtocol] = {
_currentSequenceNumber = state.seqNr + 1
val isConcurrent: Boolean = event.originVersion <> state.version
@ -410,7 +410,7 @@ private[akka] object Running {
isConcurrent)
val newState: RunningState[S] = withContext(
activeActive,
replication,
aa => aa.setContext(recoveryRunning = false, event.originReplica, concurrent = isConcurrent),
() => state.applyEvent(setup, event.event))
@ -442,7 +442,7 @@ private[akka] object Running {
// also, ensure that there is an event handler for each single event
_currentSequenceNumber = state.seqNr + 1
val newState: RunningState[S] = setup.activeActive match {
val newState: RunningState[S] = setup.replication match {
case Some(aa) =>
// set concurrent to false, local events are never concurrent
withContext(
@ -456,7 +456,7 @@ private[akka] object Running {
val eventToPersist = adaptEvent(event)
val eventAdapterManifest = setup.eventAdapter.manifest(event)
val newState2 = setup.activeActive match {
val newState2 = setup.replication match {
case Some(aa) =>
val updatedVersion = newState.version.updated(aa.replicaId.id, _currentSequenceNumber)
val r = internalPersist(
@ -494,7 +494,7 @@ private[akka] object Running {
// also, ensure that there is an event handler for each single event
_currentSequenceNumber = state.seqNr
val metadataTemplate: Option[ReplicatedEventMetadata] = setup.activeActive match {
val metadataTemplate: Option[ReplicatedEventMetadata] = setup.replication match {
case Some(aa) =>
aa.setContext(recoveryRunning = false, aa.replicaId, concurrent = false) // local events are never concurrent
Some(ReplicatedEventMetadata(aa.replicaId, 0L, state.version, concurrent = false)) // we replace it with actual seqnr later
@ -524,7 +524,7 @@ private[akka] object Running {
case None => None
}
currentState = setup.activeActive match {
currentState = setup.replication match {
case Some(aa) =>
withContext(
aa,
@ -679,7 +679,7 @@ private[akka] object Running {
onWriteSuccess(setup.context, p)
if (setup.publishEvents) {
val meta = setup.activeActive.map(aa => new ReplicatedPublishedEventMetaData(aa.replicaId, state.version))
val meta = setup.replication.map(aa => new ReplicatedPublishedEventMetaData(aa.replicaId, state.version))
context.system.eventStream ! EventStream.Publish(
PublishedEventImpl(setup.persistenceId, p.sequenceNr, p.payload, p.timestamp, meta))
}

View file

@ -179,7 +179,7 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] (
* INTERNAL API: DeferredBehavior init, not for user extension
*/
@InternalApi override def apply(context: typed.TypedActorContext[Command]): Behavior[Command] = {
// Note: duplicated in ActiveActiveEventSourcedBehavior to not break source compatibility
// Note: duplicated in ReplicatedEventSourcedBehavior to not break source compatibility
val snapshotWhen: (State, Event, Long) => Boolean = (state, event, seqNr) => shouldSnapshot(state, event, seqNr)
val tagger: Event => Set[String] = { event =>

View file

@ -12,23 +12,26 @@ import akka.actor.typed.TypedActorContext
import akka.annotation.ApiMayChange
import akka.annotation.InternalApi
import akka.persistence.typed.internal
import akka.persistence.typed.internal.ReplicationContextImpl
import akka.persistence.typed.internal.EffectImpl
import akka.persistence.typed.scaladsl.ActiveActiveContextImpl
/**
* Base class for replicated event sourced behaviors.
*/
@ApiMayChange
abstract class ActiveActiveEventSourcedBehavior[Command, Event, State](
activeActiveContext: ActiveActiveContext,
abstract class ReplicatedEventSourcedBehavior[Command, Event, State](
replicationContext: ReplicationContext,
onPersistFailure: Optional[BackoffSupervisorStrategy])
extends EventSourcedBehavior[Command, Event, State](activeActiveContext.persistenceId, onPersistFailure) {
extends EventSourcedBehavior[Command, Event, State](replicationContext.persistenceId, onPersistFailure) {
def this(activeActiveContext: ActiveActiveContext) = this(activeActiveContext, Optional.empty())
def this(replicationContext: ReplicationContext) = this(replicationContext, Optional.empty())
/**
* Override and return true to publish events to the system event stream as [[akka.persistence.typed.PublishedEvent]] after they have been persisted
*/
def withEventPublishing: Boolean = false
protected def getActiveActiveContext(): ActiveActiveContext = activeActiveContext
protected def getReplicationContext(): ReplicationContext = replicationContext
/**
* INTERNAL API: DeferredBehavior init, not for user extension
@ -59,7 +62,7 @@ abstract class ActiveActiveEventSourcedBehavior[Command, Event, State](
.withSnapshotPluginId(snapshotPluginId)
.withRecovery(recovery.asScala)
// context not user extendable so there should never be any other impls
.withActiveActive(activeActiveContext.asInstanceOf[ActiveActiveContextImpl])
.withReplication(replicationContext.asInstanceOf[ReplicationContextImpl])
val handler = signalHandler()
val behaviorWithSignalHandler =

View file

@ -11,36 +11,63 @@ import java.util.{ Map => JMap }
import akka.annotation.DoNotInherit
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.scaladsl.ActiveActiveContextImpl
import akka.persistence.typed.internal.ReplicationContextImpl
import akka.util.ccompat.JavaConverters._
/**
* Provides access to Active Active specific state
* Provides access to replication specific state
*
* Not for user extension
*/
@DoNotInherit
trait ActiveActiveContext {
def origin: ReplicaId
def concurrent: Boolean
trait ReplicationContext {
/**
* @return The replica id of this replicated event sourced actor
*/
def replicaId: ReplicaId
/**
* @return The ids of all replicas of this replicated event sourced actor
*/
def getAllReplicas: JSet[ReplicaId]
/**
* @return The unique id of this replica, including the replica id
*/
def persistenceId: PersistenceId
def recoveryRunning: Boolean
/**
* @return The unique id of this replica, not including the replica id
*/
def entityId: String
/**
* Must only be called from the event handler
* @return true when the event handler is invoked during recovery.
*/
def recoveryRunning: Boolean
/**
* Must only be called from the event handler
* @return the replica id where the current event was persisted
*/
def origin: ReplicaId
/**
* Must only be called from the event handler
* @return true if this event happened concurrent with an event from another replica
*/
def concurrent: Boolean
/**
* @return a timestamp that will always be increasing (is monotonic)
*/
def currentTimeMillis(): Long
}
/**
* Factory to create an instance of an ActiveActiveEventSourcedBehavior
*/
@FunctionalInterface
trait ActiveActiveBehaviorFactory[Command, Event, State] {
def apply(aaContext: ActiveActiveContext): ActiveActiveEventSourcedBehavior[Command, Event, State]
}
object ActiveActiveEventSourcing {
object ReplicatedEventSourcing {
/**
* Initialize a replicated event sourced behavior where all entity replicas are stored in the same journal.
@ -63,7 +90,7 @@ object ActiveActiveEventSourcing {
replicaId: ReplicaId,
allReplicaIds: JSet[ReplicaId],
queryPluginId: String,
behaviorFactory: JFunction[ActiveActiveContext, EventSourcedBehavior[Command, Event, State]])
behaviorFactory: JFunction[ReplicationContext, EventSourcedBehavior[Command, Event, State]])
: EventSourcedBehavior[Command, Event, State] =
create(entityId, replicaId, allReplicaIds.asScala.map(id => id -> queryPluginId).toMap.asJava, behaviorFactory)
@ -87,9 +114,9 @@ object ActiveActiveEventSourcing {
entityId: String,
replicaId: ReplicaId,
allReplicasAndQueryPlugins: JMap[ReplicaId, String],
eventSourcedBehaviorFactory: JFunction[ActiveActiveContext, EventSourcedBehavior[Command, Event, State]])
eventSourcedBehaviorFactory: JFunction[ReplicationContext, EventSourcedBehavior[Command, Event, State]])
: EventSourcedBehavior[Command, Event, State] = {
val context = new ActiveActiveContextImpl(entityId, replicaId, allReplicasAndQueryPlugins.asScala.toMap)
val context = new ReplicationContextImpl(entityId, replicaId, allReplicasAndQueryPlugins.asScala.toMap)
eventSourcedBehaviorFactory(context)
}

View file

@ -4,7 +4,6 @@
package akka.persistence.typed.scaladsl
import scala.annotation.tailrec
import akka.actor.typed.BackoffSupervisorStrategy
import akka.actor.typed.Behavior
import akka.actor.typed.Signal
@ -13,45 +12,18 @@ import akka.actor.typed.internal.InterceptorImpl
import akka.actor.typed.internal.LoggerClass
import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.ApiMayChange
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.SnapshotAdapter
import akka.persistence.typed.SnapshotSelectionCriteria
import akka.persistence.typed.internal._
import akka.util.OptionVal
import scala.annotation.tailrec
object EventSourcedBehavior {
// FIXME move to internal
@InternalApi
private[akka] final case class ActiveActive(
replicaId: ReplicaId,
allReplicasAndQueryPlugins: Map[ReplicaId, String],
aaContext: ActiveActiveContextImpl) {
val allReplicas: Set[ReplicaId] = allReplicasAndQueryPlugins.keySet
/**
* Must only be called on the same thread that will execute the user code
*/
def setContext(recoveryRunning: Boolean, originReplica: ReplicaId, concurrent: Boolean): Unit = {
aaContext._currentThread = OptionVal.Some(Thread.currentThread())
aaContext._recoveryRunning = recoveryRunning
aaContext._concurrent = concurrent
aaContext._origin = originReplica
}
def clearContext(): Unit = {
aaContext._currentThread = OptionVal.None
aaContext._recoveryRunning = false
aaContext._concurrent = false
aaContext._origin = null
}
}
/**
* Type alias for the command handler function that defines how to act on commands.
*
@ -175,8 +147,6 @@ object EventSourcedBehavior {
*/
def withJournalPluginId(id: String): EventSourcedBehavior[Command, Event, State]
private[akka] def withActiveActive(context: ActiveActiveContextImpl): EventSourcedBehavior[Command, Event, State]
/**
* Change the snapshot store plugin id that this actor should use.
*/
@ -253,4 +223,10 @@ object EventSourcedBehavior {
*/
@ApiMayChange
def withEventPublishing(): EventSourcedBehavior[Command, Event, State]
/**
* INTERNAL API
*/
@InternalApi
private[akka] def withReplication(context: ReplicationContextImpl): EventSourcedBehavior[Command, Event, State]
}

View file

@ -4,90 +4,65 @@
package akka.persistence.typed.scaladsl
import akka.annotation.DoNotInherit
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.ReplicaId
import akka.util.{ OptionVal, WallClock }
import akka.persistence.typed.internal.ReplicationContextImpl
import akka.util.ccompat.JavaConverters._
// FIXME docs
trait ActiveActiveContext {
/**
* Provides access to replication specific state
*
* Not for user extension
*/
@DoNotInherit
trait ReplicationContext {
/**
* @return The unique id of this replica, including the replica id
*/
def persistenceId: PersistenceId
/**
* @return The replica id of this replicated event sourced actor
*/
def replicaId: ReplicaId
/**
* @return The ids of all replicas of this replicated event sourced actor
*/
def allReplicas: Set[ReplicaId]
/**
* @return The entity id of this replicated event sourced actor (not including the replica id)
*/
def entityId: String
/**
* Must only be called from the event handler
* @return the replica id where the current event was persisted
*/
def origin: ReplicaId
/**
* Must only be called from the event handler
* @return true if this event happened concurrent with an event from another replica
*/
def concurrent: Boolean
/**
* Must only be called from the event handler
* @return true when the event handler is invoked during recovery.
*/
def recoveryRunning: Boolean
/**
* @return a timestamp that will always be increasing (is monotonic)
*/
def currentTimeMillis(): Long
}
// FIXME, parts of this can be set during initialisation
// Other fields will be set before executing the event handler as they change per event
// https://github.com/akka/akka/issues/29258
private[akka] class ActiveActiveContextImpl(
val entityId: String,
val replicaId: ReplicaId,
val replicasAndQueryPlugins: Map[ReplicaId, String])
extends ActiveActiveContext
with akka.persistence.typed.javadsl.ActiveActiveContext {
val allReplicas: Set[ReplicaId] = replicasAndQueryPlugins.keySet
// these are not volatile as they are set on the same thread as they should be accessed
var _origin: ReplicaId = null
var _recoveryRunning: Boolean = false
var _concurrent: Boolean = false
var _currentThread: OptionVal[Thread] = OptionVal.None
private def checkAccess(functionName: String): Unit = {
val callerThread = Thread.currentThread()
def error() =
throw new UnsupportedOperationException(
s"Unsupported access to ActiveActiveContext operation from the outside of event handler. " +
s"$functionName can only be called from the event handler")
_currentThread match {
case OptionVal.Some(t) =>
if (callerThread ne t) error()
case OptionVal.None =>
error()
}
}
/**
* The origin of the current event.
* Undefined result if called from anywhere other than an event handler.
*/
override def origin: ReplicaId = {
checkAccess("origin")
_origin
}
/**
* Whether the happened concurrently with an event from another replica.
* Undefined result if called from any where other than an event handler.
*/
override def concurrent: Boolean = {
checkAccess("concurrent")
_concurrent
}
override def persistenceId: PersistenceId = PersistenceId.replicatedUniqueId(entityId, replicaId)
override def currentTimeMillis(): Long = {
WallClock.AlwaysIncreasingClock.currentTimeMillis()
}
override def recoveryRunning: Boolean = {
checkAccess("recoveryRunning")
_recoveryRunning
}
override def getAllReplicas: java.util.Set[ReplicaId] = allReplicas.asJava
}
object ActiveActiveEventSourcing {
object ReplicatedEventSourcing {
/**
* Initialize a replicated event sourced behavior where all entity replicas are stored in the same journal.
@ -110,7 +85,7 @@ object ActiveActiveEventSourcing {
replicaId: ReplicaId,
allReplicaIds: Set[ReplicaId],
queryPluginId: String)(
eventSourcedBehaviorFactory: ActiveActiveContext => EventSourcedBehavior[Command, Event, State])
eventSourcedBehaviorFactory: ReplicationContext => EventSourcedBehavior[Command, Event, State])
: EventSourcedBehavior[Command, Event, State] =
apply(entityId, replicaId, allReplicaIds.map(id => id -> queryPluginId).toMap)(eventSourcedBehaviorFactory)
@ -134,10 +109,10 @@ object ActiveActiveEventSourcing {
entityId: String,
replicaId: ReplicaId,
allReplicasAndQueryPlugins: Map[ReplicaId, String])(
eventSourcedBehaviorFactory: ActiveActiveContext => EventSourcedBehavior[Command, Event, State])
eventSourcedBehaviorFactory: ReplicationContext => EventSourcedBehavior[Command, Event, State])
: EventSourcedBehavior[Command, Event, State] = {
val context = new ActiveActiveContextImpl(entityId, replicaId, allReplicasAndQueryPlugins)
eventSourcedBehaviorFactory(context).withActiveActive(context)
val context = new ReplicationContextImpl(entityId, replicaId, allReplicasAndQueryPlugins)
eventSourcedBehaviorFactory(context).withReplication(context)
}
}

View file

@ -22,9 +22,13 @@ import akka.serialization.{ BaseSerializer, SerializerWithStringManifest }
import scala.annotation.tailrec
import akka.util.ccompat.JavaConverters._
import scala.collection.immutable.TreeMap
object ActiveActiveSerializer {
/**
* INTERNAL API
*/
@InternalApi private[akka] object ReplicatedEventSourcingSerializer {
object Comparator extends Comparator[Payload] {
override def compare(a: Payload, b: Payload): Int = {
val aByteString = a.getEnclosedMessage
@ -53,7 +57,7 @@ object ActiveActiveSerializer {
/**
* INTERNAL API
*/
@InternalApi private[akka] final class ActiveActiveSerializer(val system: ExtendedActorSystem)
@InternalApi private[akka] final class ReplicatedEventSourcingSerializer(val system: ExtendedActorSystem)
extends SerializerWithStringManifest
with BaseSerializer {
@ -132,23 +136,34 @@ object ActiveActiveSerializer {
}
def counterFromBinary(bytes: Array[Byte]): Counter =
Counter(BigInt(ActiveActive.Counter.parseFrom(bytes).getValue.toByteArray))
Counter(BigInt(ReplicatedEventSourcing.Counter.parseFrom(bytes).getValue.toByteArray))
def counterUpdatedFromBinary(bytes: Array[Byte]): Counter.Updated =
Counter.Updated(BigInt(ActiveActive.CounterUpdate.parseFrom(bytes).getDelta.toByteArray))
Counter.Updated(BigInt(ReplicatedEventSourcing.CounterUpdate.parseFrom(bytes).getDelta.toByteArray))
def counterToProtoByteArray(counter: Counter): Array[Byte] =
ActiveActive.Counter.newBuilder().setValue(ByteString.copyFrom(counter.value.toByteArray)).build().toByteArray
ReplicatedEventSourcing.Counter
.newBuilder()
.setValue(ByteString.copyFrom(counter.value.toByteArray))
.build()
.toByteArray
def counterUpdatedToProtoBufByteArray(updated: Counter.Updated): Array[Byte] =
ActiveActive.CounterUpdate.newBuilder().setDelta(ByteString.copyFrom(updated.delta.toByteArray)).build().toByteArray
ReplicatedEventSourcing.CounterUpdate
.newBuilder()
.setDelta(ByteString.copyFrom(updated.delta.toByteArray))
.build()
.toByteArray
def orsetToProto(orset: ORSet[_]): ActiveActive.ORSet =
def orsetToProto(orset: ORSet[_]): ReplicatedEventSourcing.ORSet =
orsetToProtoImpl(orset.asInstanceOf[ORSet[Any]])
private def orsetToProtoImpl(orset: ORSet[Any]): ActiveActive.ORSet = {
private def orsetToProtoImpl(orset: ORSet[Any]): ReplicatedEventSourcing.ORSet = {
val b =
ActiveActive.ORSet.newBuilder().setOriginDc(orset.originReplica).setVvector(versionVectorToProto(orset.vvector))
ReplicatedEventSourcing.ORSet
.newBuilder()
.setOriginDc(orset.originReplica)
.setVvector(versionVectorToProto(orset.vvector))
// using java collections and sorting for performance (avoid conversions)
val stringElements = new ArrayList[String]
val intElements = new ArrayList[Integer]
@ -194,7 +209,7 @@ object ActiveActiveSerializer {
addDots(longElements)
}
if (!otherElements.isEmpty) {
Collections.sort(otherElements, ActiveActiveSerializer.Comparator)
Collections.sort(otherElements, ReplicatedEventSourcingSerializer.Comparator)
b.addAllOtherElements(otherElements)
addDots(otherElements)
}
@ -203,7 +218,7 @@ object ActiveActiveSerializer {
}
def replicatedEventMetadataToProtoByteArray(rem: ReplicatedEventMetadata): Array[Byte] = {
ActiveActive.ReplicatedEventMetadata
ReplicatedEventSourcing.ReplicatedEventMetadata
.newBuilder()
.setOriginSequenceNr(rem.originSequenceNr)
.setConcurrent(rem.concurrent)
@ -214,7 +229,7 @@ object ActiveActiveSerializer {
}
def replicatedSnapshotMetadataToByteArray(rsm: ReplicatedSnapshotMetadata): Array[Byte] = {
ActiveActive.ReplicatedSnapshotMetadata
ReplicatedEventSourcing.ReplicatedSnapshotMetadata
.newBuilder()
.setVersion(versionVectorToProto(rsm.version))
.addAllSeenPerReplica(rsm.seenPerReplica.map(seenToProto).asJava)
@ -222,35 +237,39 @@ object ActiveActiveSerializer {
.toByteArray
}
def seenToProto(t: (ReplicaId, Long)): ActiveActive.ReplicatedSnapshotMetadata.Seen = {
ActiveActive.ReplicatedSnapshotMetadata.Seen.newBuilder().setReplicaId(t._1.id).setSequenceNr(t._2).build()
def seenToProto(t: (ReplicaId, Long)): ReplicatedEventSourcing.ReplicatedSnapshotMetadata.Seen = {
ReplicatedEventSourcing.ReplicatedSnapshotMetadata.Seen
.newBuilder()
.setReplicaId(t._1.id)
.setSequenceNr(t._2)
.build()
}
def orsetFromBinary(bytes: Array[Byte]): ORSet[Any] =
orsetFromProto(ActiveActive.ORSet.parseFrom(bytes))
orsetFromProto(ReplicatedEventSourcing.ORSet.parseFrom(bytes))
private def orsetAddFromBinary(bytes: Array[Byte]): ORSet.AddDeltaOp[Any] =
new ORSet.AddDeltaOp(orsetFromProto(ActiveActive.ORSet.parseFrom(bytes)))
new ORSet.AddDeltaOp(orsetFromProto(ReplicatedEventSourcing.ORSet.parseFrom(bytes)))
private def orsetRemoveFromBinary(bytes: Array[Byte]): ORSet.RemoveDeltaOp[Any] =
new ORSet.RemoveDeltaOp(orsetFromProto(ActiveActive.ORSet.parseFrom(bytes)))
new ORSet.RemoveDeltaOp(orsetFromProto(ReplicatedEventSourcing.ORSet.parseFrom(bytes)))
private def orsetFullFromBinary(bytes: Array[Byte]): ORSet.FullStateDeltaOp[Any] =
new ORSet.FullStateDeltaOp(orsetFromProto(ActiveActive.ORSet.parseFrom(bytes)))
new ORSet.FullStateDeltaOp(orsetFromProto(ReplicatedEventSourcing.ORSet.parseFrom(bytes)))
private def orsetDeltaGroupToProto(deltaGroup: ORSet.DeltaGroup[_]): ActiveActive.ORSetDeltaGroup = {
def createEntry(opType: ActiveActive.ORSetDeltaOp, u: ORSet[_]) = {
ActiveActive.ORSetDeltaGroup.Entry.newBuilder().setOperation(opType).setUnderlying(orsetToProto(u))
private def orsetDeltaGroupToProto(deltaGroup: ORSet.DeltaGroup[_]): ReplicatedEventSourcing.ORSetDeltaGroup = {
def createEntry(opType: ReplicatedEventSourcing.ORSetDeltaOp, u: ORSet[_]) = {
ReplicatedEventSourcing.ORSetDeltaGroup.Entry.newBuilder().setOperation(opType).setUnderlying(orsetToProto(u))
}
val b = ActiveActive.ORSetDeltaGroup.newBuilder()
val b = ReplicatedEventSourcing.ORSetDeltaGroup.newBuilder()
deltaGroup.ops.foreach {
case ORSet.AddDeltaOp(u) =>
b.addEntries(createEntry(ActiveActive.ORSetDeltaOp.Add, u))
b.addEntries(createEntry(ReplicatedEventSourcing.ORSetDeltaOp.Add, u))
case ORSet.RemoveDeltaOp(u) =>
b.addEntries(createEntry(ActiveActive.ORSetDeltaOp.Remove, u))
b.addEntries(createEntry(ReplicatedEventSourcing.ORSetDeltaOp.Remove, u))
case ORSet.FullStateDeltaOp(u) =>
b.addEntries(createEntry(ActiveActive.ORSetDeltaOp.Full, u))
b.addEntries(createEntry(ReplicatedEventSourcing.ORSetDeltaOp.Full, u))
case ORSet.DeltaGroup(_) =>
throw new IllegalArgumentException("ORSet.DeltaGroup should not be nested")
}
@ -258,14 +277,14 @@ object ActiveActiveSerializer {
}
private def orsetDeltaGroupFromBinary(bytes: Array[Byte]): ORSet.DeltaGroup[Any] = {
val deltaGroup = ActiveActive.ORSetDeltaGroup.parseFrom(bytes)
val deltaGroup = ReplicatedEventSourcing.ORSetDeltaGroup.parseFrom(bytes)
val ops: Vector[ORSet.DeltaOp] =
deltaGroup.getEntriesList.asScala.map { entry =>
if (entry.getOperation == ActiveActive.ORSetDeltaOp.Add)
if (entry.getOperation == ReplicatedEventSourcing.ORSetDeltaOp.Add)
ORSet.AddDeltaOp(orsetFromProto(entry.getUnderlying))
else if (entry.getOperation == ActiveActive.ORSetDeltaOp.Remove)
else if (entry.getOperation == ReplicatedEventSourcing.ORSetDeltaOp.Remove)
ORSet.RemoveDeltaOp(orsetFromProto(entry.getUnderlying))
else if (entry.getOperation == ActiveActive.ORSetDeltaOp.Full)
else if (entry.getOperation == ReplicatedEventSourcing.ORSetDeltaOp.Full)
ORSet.FullStateDeltaOp(orsetFromProto(entry.getUnderlying))
else
throw new NotSerializableException(s"Unknow ORSet delta operation ${entry.getOperation}")
@ -273,7 +292,7 @@ object ActiveActiveSerializer {
ORSet.DeltaGroup(ops)
}
def orsetFromProto(orset: ActiveActive.ORSet): ORSet[Any] = {
def orsetFromProto(orset: ReplicatedEventSourcing.ORSet): ORSet[Any] = {
val elements: Iterator[Any] =
(orset.getStringElementsList.iterator.asScala ++
orset.getIntElementsList.iterator.asScala ++
@ -286,18 +305,19 @@ object ActiveActiveSerializer {
new ORSet(orset.getOriginDc, elementsMap, vvector = versionVectorFromProto(orset.getVvector))
}
def versionVectorToProto(versionVector: VersionVector): ActiveActive.VersionVector = {
val b = ActiveActive.VersionVector.newBuilder()
def versionVectorToProto(versionVector: VersionVector): ReplicatedEventSourcing.VersionVector = {
val b = ReplicatedEventSourcing.VersionVector.newBuilder()
versionVector.versionsIterator.foreach {
case (key, value) => b.addEntries(ActiveActive.VersionVector.Entry.newBuilder().setKey(key).setVersion(value))
case (key, value) =>
b.addEntries(ReplicatedEventSourcing.VersionVector.Entry.newBuilder().setKey(key).setVersion(value))
}
b.build()
}
def versionVectorFromBinary(bytes: Array[Byte]): VersionVector =
versionVectorFromProto(ActiveActive.VersionVector.parseFrom(bytes))
versionVectorFromProto(ReplicatedEventSourcing.VersionVector.parseFrom(bytes))
def versionVectorFromProto(versionVector: ActiveActive.VersionVector): VersionVector = {
def versionVectorFromProto(versionVector: ReplicatedEventSourcing.VersionVector): VersionVector = {
val entries = versionVector.getEntriesList
if (entries.isEmpty)
VersionVector.empty
@ -311,7 +331,7 @@ object ActiveActiveSerializer {
}
def replicatedEventMetadataFromBinary(bytes: Array[Byte]): ReplicatedEventMetadata = {
val parsed = ActiveActive.ReplicatedEventMetadata.parseFrom(bytes)
val parsed = ReplicatedEventSourcing.ReplicatedEventMetadata.parseFrom(bytes)
ReplicatedEventMetadata(
ReplicaId(parsed.getOriginReplica),
parsed.getOriginSequenceNr,
@ -320,7 +340,7 @@ object ActiveActiveSerializer {
}
def replicatedSnapshotMetadataFromBinary(bytes: Array[Byte]): ReplicatedSnapshotMetadata = {
val parsed = ActiveActive.ReplicatedSnapshotMetadata.parseFrom(bytes)
val parsed = ReplicatedEventSourcing.ReplicatedSnapshotMetadata.parseFrom(bytes)
ReplicatedSnapshotMetadata(
versionVectorFromProto(parsed.getVersion),
parsed.getSeenPerReplicaList.asScala.map(seen => ReplicaId(seen.getReplicaId) -> seen.getSequenceNr).toMap)

View file

@ -13,12 +13,12 @@ import akka.persistence.typed.internal.ReplicatedSnapshotMetadata
import akka.persistence.typed.internal.VersionVector
import org.scalatest.wordspec.AnyWordSpecLike
class ActiveActiveSerializationSpec
class ReplicatedEventSourcingSerializationSpec
extends ScalaTestWithActorTestKit(ClusterSingletonPersistenceSpec.config)
with AnyWordSpecLike
with LogCapturing {
"The ActiveActive components that needs to be serializable" must {
"The Replicated Event Sourcing components that needs to be serializable" must {
"be serializable" in {
serializationTestKit.verifySerialization(

View file

@ -67,7 +67,7 @@ class EventSourcedBehaviorWatchSpec
holdingRecoveryPermit = false,
settings = settings,
stashState = new StashState(context.asInstanceOf[ActorContext[InternalProtocol]], settings),
activeActive = None,
replication = None,
publishEvents = false)
"A typed persistent parent actor watching a child" must {

View file

@ -11,7 +11,7 @@ import scala.runtime.AbstractFunction3
* @param persistenceId id of persistent actor from which the snapshot was taken.
* @param sequenceNr sequence number at which the snapshot was taken.
* @param timestamp time at which the snapshot was saved, defaults to 0 when unknown.
* @param metadata a journal can optionally support persisting metadata separate to the domain state, used for active active support
* @param metadata a journal can optionally support persisting metadata separate to the domain state, used for Replicated Event Sourcing support
*/
@SerialVersionUID(1L)
final class SnapshotMetadata(