Replicated Sharding improvements (#29483)

* WIP

* Finishing touches to sharding updates

* Review feedback
This commit is contained in:
Christopher Batey 2020-08-13 11:27:00 +01:00
parent 779e827495
commit 849018b81e
38 changed files with 615 additions and 266 deletions

View file

@ -16,6 +16,7 @@ import java.util.{ Set => JSet }
import akka.annotation.ApiMayChange
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
import akka.persistence.typed.ReplicationId.Separator
@ApiMayChange
object ReplicatedEntityProvider {
@ -28,29 +29,30 @@ object ReplicatedEntityProvider {
*/
def create[M, E](
messageClass: Class[M],
typeName: String,
allReplicaIds: JSet[ReplicaId],
settingsPerReplicaFactory: akka.japi.function.Function3[
JEntityTypeKey[M],
ReplicaId,
JSet[ReplicaId],
ReplicatedEntity[M, E]]): ReplicatedEntityProvider[M, E] = {
settingsPerReplicaFactory: akka.japi.function.Function2[JEntityTypeKey[M], ReplicaId, ReplicatedEntity[M, E]])
: ReplicatedEntityProvider[M, E] = {
implicit val classTag: ClassTag[M] = ClassTag(messageClass)
apply[M, E](allReplicaIds.asScala.toSet)((key, replica, _) =>
settingsPerReplicaFactory(key.asInstanceOf[EntityTypeKeyImpl[M]], replica, allReplicaIds))
apply[M, E](typeName, allReplicaIds.asScala.toSet)((key, replica) =>
settingsPerReplicaFactory(key.asInstanceOf[EntityTypeKeyImpl[M]], replica))
}
/**
* Scala API:
*
* @param typeName The type name used in the [[EntityTypeKey]]
* @tparam M The type of messages the replicated entity accepts
* @tparam E The type for envelopes used for sending `M`s over sharding
*/
def apply[M: ClassTag, E](allReplicaIds: Set[ReplicaId])(
settingsPerReplicaFactory: (EntityTypeKey[M], ReplicaId, Set[ReplicaId]) => ReplicatedEntity[M, E])
def apply[M: ClassTag, E](typeName: String, allReplicaIds: Set[ReplicaId])(
settingsPerReplicaFactory: (EntityTypeKey[M], ReplicaId) => ReplicatedEntity[M, E])
: ReplicatedEntityProvider[M, E] = {
new ReplicatedEntityProvider(allReplicaIds.map { replicaId =>
val typeKey = EntityTypeKey[M](replicaId.id)
settingsPerReplicaFactory(typeKey, replicaId, allReplicaIds)
if (typeName.contains(Separator))
throw new IllegalArgumentException(s"typeName [$typeName] contains [$Separator] which is a reserved character")
val typeKey = EntityTypeKey[M](s"$typeName${Separator}${replicaId.id}")
(settingsPerReplicaFactory(typeKey, replicaId), typeName)
}.toVector, directReplication = false)
}
}
@ -61,7 +63,7 @@ object ReplicatedEntityProvider {
*/
@ApiMayChange
final class ReplicatedEntityProvider[M, E] private (
val replicas: immutable.Seq[ReplicatedEntity[M, E]],
val replicas: immutable.Seq[(ReplicatedEntity[M, E], String)],
val directReplication: Boolean) {
/**

View file

@ -16,12 +16,12 @@ import akka.cluster.sharding.typed.ReplicatedEntityProvider
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.EntityRef
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.ReplicaId
import org.slf4j.LoggerFactory
import akka.actor.typed.scaladsl.LoggerOps
import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.sharding.typed.ShardingDirectReplication
import akka.persistence.typed.ReplicationId
import akka.util.ccompat.JavaConverters._
/**
@ -36,17 +36,23 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_]
override def init[M, E](settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] = {
val sharding = ClusterSharding(system)
val initializedReplicas = settings.replicas.map { replicaSettings =>
// start up a sharding instance per replica id
logger.infoN(
"Starting Replicated Event Sourcing sharding for replica [{}] (ShardType: [{}])",
replicaSettings.replicaId.id,
replicaSettings.entity.typeKey.name)
val regionOrProxy = sharding.init(replicaSettings.entity)
(replicaSettings.replicaId, replicaSettings.entity.typeKey, regionOrProxy)
val initializedReplicas = settings.replicas.map {
case (replicaSettings, typeName) =>
// start up a sharding instance per replica id
logger.infoN(
"Starting Replicated Event Sourcing sharding for replica [{}] (ShardType: [{}])",
replicaSettings.replicaId.id,
replicaSettings.entity.typeKey.name)
val regionOrProxy = sharding.init(replicaSettings.entity)
(
typeName,
replicaSettings.replicaId,
replicaSettings.entity.typeKey,
regionOrProxy,
replicaSettings.entity.dataCenter)
}
val replicaToRegionOrProxy = initializedReplicas.map {
case (id, _, regionOrProxy) => id -> regionOrProxy
case (_, replicaId, _, regionOrProxy, _) => replicaId -> regionOrProxy
}.toMap
if (settings.directReplication) {
logger.infoN("Starting Replicated Event Sourcing Direct Replication")
@ -55,7 +61,9 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_]
s"directReplication-${counter.incrementAndGet()}")
}
val replicaToTypeKey = initializedReplicas.map { case (id, typeKey, _) => id -> typeKey }.toMap
val replicaToTypeKey = initializedReplicas.map {
case (typeName, id, typeKey, _, dc) => id -> ((typeKey, dc, typeName))
}.toMap
new ReplicatedShardingImpl(sharding, replicaToRegionOrProxy, replicaToTypeKey)
}
}
@ -67,16 +75,21 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_]
private[akka] final class ReplicatedShardingImpl[M, E](
sharding: ClusterSharding,
shardingPerReplica: Map[ReplicaId, ActorRef[E]],
replicaTypeKeys: Map[ReplicaId, EntityTypeKey[M]])
replicaTypeKeys: Map[ReplicaId, (EntityTypeKey[M], Option[DataCenter], String)])
extends ReplicatedSharding[M, E] {
// FIXME add test coverage for these
override def shardingRefs: Map[ReplicaId, ActorRef[E]] = shardingPerReplica
override def getShardingRefs: JMap[ReplicaId, ActorRef[E]] = shardingRefs.asJava
override def entityRefsFor(entityId: String): Map[ReplicaId, EntityRef[M]] =
replicaTypeKeys.map {
case (replicaId, typeKey) =>
replicaId -> sharding.entityRefFor(typeKey, PersistenceId.ofUniqueId(entityId).id)
case (replicaId, (typeKey, dc, typeName)) =>
replicaId -> (dc match {
case None => sharding.entityRefFor(typeKey, ReplicationId(typeName, entityId, replicaId).persistenceId.id)
case Some(dc) =>
sharding.entityRefFor(typeKey, ReplicationId(typeName, entityId, replicaId).persistenceId.id, dc)
})
}
override def getEntityRefsFor(entityId: String): JMap[ReplicaId, EntityRef[M]] =

View file

@ -21,6 +21,7 @@ import akka.cluster.typed.Join;
import akka.persistence.testkit.PersistenceTestKitPlugin;
import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.ReplicationId;
import akka.persistence.typed.javadsl.*;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;
@ -32,6 +33,7 @@ import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import static akka.cluster.sharding.typed.ReplicatedShardingTest.ProxyActor.ALL_REPLICAS;
import static org.junit.Assert.assertEquals;
public class ReplicatedShardingTest extends JUnitSuite {
@ -64,13 +66,10 @@ public class ReplicatedShardingTest extends JUnitSuite {
}
}
static Behavior<Command> create(
String entityId, ReplicaId replicaId, Set<ReplicaId> allReplicas) {
static Behavior<Command> create(ReplicationId replicationId) {
return ReplicatedEventSourcing.withSharedJournal(
"StringSet",
entityId,
replicaId,
allReplicas,
replicationId,
ALL_REPLICAS,
PersistenceTestKitReadJournal.Identifier(),
MyReplicatedStringSet::new);
}
@ -157,9 +156,10 @@ public class ReplicatedShardingTest extends JUnitSuite {
replicatedEntityProvider =
ReplicatedEntityProvider.create(
MyReplicatedStringSet.Command.class,
"StringSet",
ALL_REPLICAS,
// factory for replicated entity for a given replica
(entityTypeKey, replicaId, allReplicas) ->
(entityTypeKey, replicaId) ->
ReplicatedEntity.create(
replicaId,
// use the replica id as typekey for sharding to get one sharding instance
@ -169,7 +169,7 @@ public class ReplicatedShardingTest extends JUnitSuite {
entityContext ->
// factory for the entity for a given entity in that replica
MyReplicatedStringSet.create(
entityContext.getEntityId(), replicaId, allReplicas))
ReplicationId.fromString(entityContext.getEntityId())))
// potentially use replica id as role or dc in Akka multi dc for the
// sharding instance
// to control where replicas will live

View file

@ -0,0 +1,103 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.cluster.sharding.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.cluster.sharding.typed.*;
import akka.cluster.sharding.typed.javadsl.Entity;
import akka.cluster.sharding.typed.scaladsl.EntityRef;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.ReplicationId;
import java.util.*;
public class ReplicatedShardingCompileOnlySpec {
private static ActorSystem<?> system = null;
interface Command {}
public static Behavior<Command> myEventSourcedBehavior(ReplicationId replicationId) {
return null;
}
public static final Set<ReplicaId> ALL_REPLICAS =
Collections.unmodifiableSet(
new HashSet<>(
Arrays.asList(new ReplicaId("DC-A"), new ReplicaId("DC-B"), new ReplicaId("DC-C"))));
public static ReplicatedEntityProvider<Command, ShardingEnvelope<Command>> provider() {
// #bootstrap
return ReplicatedEntityProvider.create(
Command.class,
"MyReplicatedType",
ALL_REPLICAS,
(entityTypeKey, replicaId) ->
ReplicatedEntity.create(
replicaId,
Entity.of(
entityTypeKey,
entityContext ->
myEventSourcedBehavior(
ReplicationId.fromString(entityContext.getEntityId())))));
// #bootstrap
}
public static void dc() {
// #bootstrap-dc
ReplicatedEntityProvider.create(
Command.class,
"MyReplicatedType",
ALL_REPLICAS,
(entityTypeKey, replicaId) ->
ReplicatedEntity.create(
replicaId,
Entity.of(
entityTypeKey,
entityContext ->
myEventSourcedBehavior(
ReplicationId.fromString(entityContext.getEntityId())))
.withDataCenter(replicaId.id())));
// #bootstrap-dc
}
public static ReplicatedEntityProvider<Command, ShardingEnvelope<Command>> role() {
// #bootstrap-role
return ReplicatedEntityProvider.create(
Command.class,
"MyReplicatedType",
ALL_REPLICAS,
(entityTypeKey, replicaId) ->
ReplicatedEntity.create(
replicaId,
Entity.of(
entityTypeKey,
entityContext ->
myEventSourcedBehavior(
ReplicationId.fromString(entityContext.getEntityId())))
.withRole(replicaId.id())));
// #bootstrap-role
}
public static void sendingMessages() {
// #sending-messages
ReplicatedShardingExtension extension = ReplicatedShardingExtension.get(system);
ReplicatedSharding<Command, ShardingEnvelope<Command>> replicatedSharding =
extension.init(provider());
Map<ReplicaId, EntityRef<Command>> myEntityId =
replicatedSharding.getEntityRefsFor("myEntityId");
Map<ReplicaId, ActorRef<ShardingEnvelope<Command>>> shardingRefs =
replicatedSharding.getShardingRefs();
// #sending-messages
}
}

View file

@ -10,10 +10,10 @@ import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.eventstream.EventStream
import akka.persistence.typed
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.PublishedEvent
import akka.persistence.typed.internal.{ PublishedEventImpl, ReplicatedPublishedEventMetaData, VersionVector }
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.ReplicationId
class ReplicatedShardingDirectReplicationSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing {
@ -37,7 +37,7 @@ class ReplicatedShardingDirectReplicationSpec extends ScalaTestWithActorTestKit
upProbe.receiveMessage() // not bullet proof wrt to subscription being complete but good enough
val event = PublishedEventImpl(
PersistenceId.replicatedId("ReplicatedShardingSpec", "pid", ReplicaId("ReplicaA")),
ReplicationId("ReplicatedShardingSpec", "pid", ReplicaId("ReplicaA")).persistenceId,
1L,
"event",
System.currentTimeMillis(),

View file

@ -6,12 +6,18 @@ package akka.cluster.sharding.typed
import java.util.concurrent.ThreadLocalRandom
import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.MemberStatus
import akka.cluster.sharding.typed.ReplicatedShardingSpec.DataCenter
import akka.cluster.sharding.typed.ReplicatedShardingSpec.Normal
import akka.cluster.sharding.typed.ReplicatedShardingSpec.ReplicationType
import akka.cluster.sharding.typed.ReplicatedShardingSpec.Role
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.typed.Cluster
import akka.cluster.typed.Join
@ -24,35 +30,54 @@ import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.serialization.jackson.CborSerializable
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.typed.scaladsl.LoggerOps
import akka.cluster.sharding.typed.ReplicatedShardingSpec.MyReplicatedIntSet
import akka.cluster.sharding.typed.ReplicatedShardingSpec.MyReplicatedStringSet
import akka.persistence.typed.ReplicationId
import com.typesafe.config.Config
object ReplicatedShardingSpec {
def config = ConfigFactory.parseString("""
akka.loglevel = DEBUG
def commonConfig = ConfigFactory.parseString("""
akka.loglevel = INFO
akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
akka.actor.provider = "cluster"
# pretend we're a node in all dc:s
akka.cluster.roles = ["DC-A", "DC-B", "DC-C"]
akka.remote.classic.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0""").withFallback(PersistenceTestKitPlugin.config)
}
class ReplicatedShardingSpec
extends ScalaTestWithActorTestKit(ReplicatedShardingSpec.config)
with AnyWordSpecLike
with LogCapturing {
def roleAConfig = ConfigFactory.parseString("""
akka.cluster.roles = ["DC-A"]
""".stripMargin).withFallback(commonConfig)
def roleBConfig = ConfigFactory.parseString("""
akka.cluster.roles = ["DC-B"]
""".stripMargin).withFallback(commonConfig)
def dcAConfig = ConfigFactory.parseString("""
akka.cluster.multi-data-center.self-data-center = "DC-A"
""").withFallback(commonConfig)
def dcBConfig = ConfigFactory.parseString("""
akka.cluster.multi-data-center.self-data-center = "DC-B"
""").withFallback(commonConfig)
sealed trait ReplicationType
case object Role extends ReplicationType
case object DataCenter extends ReplicationType
case object Normal extends ReplicationType
val AllReplicas = Set(ReplicaId("DC-A"), ReplicaId("DC-B"))
object MyReplicatedStringSet {
trait Command extends CborSerializable
case class Add(text: String) extends Command
case class GetTexts(replyTo: ActorRef[Texts]) extends Command
case class Texts(texts: Set[String]) extends CborSerializable
def apply(entityId: String, replicaId: ReplicaId, allReplicas: Set[ReplicaId]): Behavior[Command] =
ReplicatedEventSourcing.withSharedJournal(
"StringSet",
entityId,
replicaId,
allReplicas,
def apply(replicationId: ReplicationId): Behavior[Command] =
ReplicatedEventSourcing.withSharedJournal( // it isn't really shared as it is in memory
replicationId,
AllReplicas,
PersistenceTestKitReadJournal.Identifier) { replicationContext =>
EventSourcedBehavior[Command, String, Set[String]](
replicationContext.persistenceId,
@ -65,81 +90,205 @@ class ReplicatedShardingSpec
replyTo ! Texts(state)
Effect.none
},
(state, event) => state + event).withJournalPluginId(PersistenceTestKitPlugin.PluginId)
(state, event) => state + event)
.withJournalPluginId(PersistenceTestKitPlugin.PluginId)
.withEventPublishing(true)
}
def provider(replicationType: ReplicationType) =
ReplicatedEntityProvider[MyReplicatedStringSet.Command, ShardingEnvelope[MyReplicatedStringSet.Command]](
// all replicas
"StringSet",
AllReplicas) { (entityTypeKey, replicaId) =>
// factory for replicated entity for a given replica
val entity = {
val e = Entity(entityTypeKey) { entityContext =>
MyReplicatedStringSet(ReplicationId.fromString(entityContext.entityId))
}
replicationType match {
case Role =>
e.withRole(replicaId.id)
case DataCenter =>
e.withDataCenter(replicaId.id)
case Normal =>
e
}
}
ReplicatedEntity(replicaId, entity)
}.withDirectReplication(true)
}
object ProxyActor {
sealed trait Command
case class ForwardToRandom(entityId: String, msg: MyReplicatedStringSet.Command) extends Command
case class ForwardToAll(entityId: String, msg: MyReplicatedStringSet.Command) extends Command
object MyReplicatedIntSet {
trait Command extends CborSerializable
case class Add(text: Int) extends Command
case class GetInts(replyTo: ActorRef[Ints]) extends Command
case class Ints(ints: Set[Int]) extends CborSerializable
def apply(): Behavior[Command] = Behaviors.setup { context =>
// #bootstrap
val replicatedShardingProvider =
ReplicatedEntityProvider[MyReplicatedStringSet.Command, ShardingEnvelope[MyReplicatedStringSet.Command]](
// all replicas
Set(ReplicaId("DC-A"), ReplicaId("DC-B"), ReplicaId("DC-C"))) { (entityTypeKey, replicaId, allReplicaIds) =>
// factory for replicated entity for a given replica
ReplicatedEntity(
replicaId,
// use the provided entity type key for sharding to get one sharding instance per replica
Entity(entityTypeKey) { entityContext =>
// factory for the entity for a given entity in that replica
MyReplicatedStringSet(entityContext.entityId, replicaId, allReplicaIds)
}
// potentially use replica id as role or dc in Akka multi dc for the sharding instance
// to control where replicas will live
// .withDataCenter(replicaId.id))
.withRole(replicaId.id))
def apply(id: ReplicationId, allReplicas: Set[ReplicaId]): Behavior[Command] =
ReplicatedEventSourcing.withSharedJournal( // it isn't really shared as it is in memory
id,
allReplicas,
PersistenceTestKitReadJournal.Identifier) { replicationContext =>
EventSourcedBehavior[Command, Int, Set[Int]](
replicationContext.persistenceId,
Set.empty[Int],
(state, command) =>
command match {
case Add(int) =>
Effect.persist(int)
case GetInts(replyTo) =>
replyTo ! Ints(state)
Effect.none
},
(state, event) => state + event)
.withJournalPluginId(PersistenceTestKitPlugin.PluginId)
.withEventPublishing(true)
}
def provider(replicationType: ReplicationType) =
ReplicatedEntityProvider[MyReplicatedIntSet.Command, ShardingEnvelope[MyReplicatedIntSet.Command]](
"IntSet",
AllReplicas) { (entityTypeKey, replicaId) =>
val entity = {
val e = Entity(entityTypeKey) { entityContext =>
val replicationId = ReplicationId.fromString(entityContext.entityId)
MyReplicatedIntSet(replicationId, AllReplicas)
}
replicationType match {
case Role =>
e.withRole(replicaId.id)
case DataCenter =>
e.withDataCenter(replicaId.id)
case Normal =>
e
}
}
ReplicatedEntity(replicaId, entity)
}.withDirectReplication(true)
}
}
val replicatedSharding = ReplicatedShardingExtension(context.system).init(replicatedShardingProvider)
// #bootstrap
object ProxyActor {
sealed trait Command
case class ForwardToRandomString(entityId: String, msg: MyReplicatedStringSet.Command) extends Command
case class ForwardToAllString(entityId: String, msg: MyReplicatedStringSet.Command) extends Command
case class ForwardToRandomInt(entityId: String, msg: MyReplicatedIntSet.Command) extends Command
case class ForwardToAllInt(entityId: String, msg: MyReplicatedIntSet.Command) extends Command
def apply(replicationType: ReplicationType): Behavior[Command] = Behaviors.setup { context =>
val replicatedShardingStringSet =
ReplicatedShardingExtension(context.system).init(MyReplicatedStringSet.provider(replicationType))
val replicatedShardingIntSet =
ReplicatedShardingExtension(context.system).init(MyReplicatedIntSet.provider(replicationType))
Behaviors.setup { ctx =>
Behaviors.receiveMessage {
case ForwardToAll(entityId, cmd) =>
// #all-entity-refs
replicatedSharding.entityRefsFor(entityId).foreach {
case ForwardToAllString(entityId, cmd) =>
val entityRefs = replicatedShardingStringSet.entityRefsFor(entityId)
ctx.log.infoN("Entity refs {}", entityRefs)
entityRefs.foreach {
case (replica, ref) =>
ctx.log.infoN("Forwarding to replica {} ref {}", replica, ref)
ref ! cmd
}
Behaviors.same
case ForwardToRandomString(entityId, cmd) =>
val refs = replicatedShardingStringSet.entityRefsFor(entityId)
val chosenIdx = ThreadLocalRandom.current().nextInt(refs.size)
val chosen = refs.values.toIndexedSeq(chosenIdx)
ctx.log.info("Forwarding to {}", chosen)
chosen ! cmd
Behaviors.same
case ForwardToAllInt(entityId, cmd) =>
replicatedShardingIntSet.entityRefsFor(entityId).foreach {
case (_, ref) => ref ! cmd
}
// #all-entity-refs
Behaviors.same
case ForwardToRandom(entityId, cmd) =>
val refs = replicatedSharding.entityRefsFor(entityId)
case ForwardToRandomInt(entityId, cmd) =>
val refs =
replicatedShardingIntSet.entityRefsFor(entityId)
val chosenIdx = ThreadLocalRandom.current().nextInt(refs.size)
refs.values.toIndexedSeq(chosenIdx) ! cmd;
refs.values.toIndexedSeq(chosenIdx) ! cmd
Behaviors.same
}
}
}
}
class NormalReplicatedShardingSpec
extends ReplicatedShardingSpec(Normal, ReplicatedShardingSpec.commonConfig, ReplicatedShardingSpec.commonConfig)
class RoleReplicatedShardingSpec
extends ReplicatedShardingSpec(Role, ReplicatedShardingSpec.roleAConfig, ReplicatedShardingSpec.roleBConfig)
class DataCenterReplicatedShardingSpec
extends ReplicatedShardingSpec(DataCenter, ReplicatedShardingSpec.dcAConfig, ReplicatedShardingSpec.dcBConfig)
abstract class ReplicatedShardingSpec(replicationType: ReplicationType, configA: Config, configB: Config)
extends ScalaTestWithActorTestKit(configA)
with AnyWordSpecLike
with LogCapturing {
val system2 = ActorSystem(Behaviors.ignore[Any], name = system.name, config = configB)
override protected def afterAll(): Unit = {
super.afterAll()
ActorTestKit.shutdown(
system2,
testKitSettings.DefaultActorSystemShutdownTimeout,
testKitSettings.ThrowOnShutdownTimeout)
}
"Replicated sharding" should {
"form a one node cluster" in {
val node = Cluster(system)
node.manager ! Join(node.selfMember.address)
Cluster(system).manager ! Join(Cluster(system).selfMember.address)
Cluster(system2).manager ! Join(Cluster(system).selfMember.address)
eventually {
node.selfMember.status should ===(MemberStatus.Up)
Cluster(system).state.members.size should ===(2)
Cluster(system).state.members.map(_.status) should ===(Set[MemberStatus](MemberStatus.Up))
}
eventually {
Cluster(system2).state.members.size should ===(2)
Cluster(system2).state.members.map(_.status) should ===(Set[MemberStatus](MemberStatus.Up))
}
}
"forward to replicas" in {
val proxy = spawn(ProxyActor())
"start replicated sharding on both nodes" in {
def start(sys: ActorSystem[_]) = {
ReplicatedShardingExtension(sys).init(MyReplicatedStringSet.provider(replicationType))
ReplicatedShardingExtension(sys).init(MyReplicatedIntSet.provider(replicationType))
}
start(system)
start(system2)
}
proxy ! ProxyActor.ForwardToAll("id1", MyReplicatedStringSet.Add("to-all"))
proxy ! ProxyActor.ForwardToRandom("id1", MyReplicatedStringSet.Add("to-random"))
"forward to replicas" in {
val proxy = spawn(ProxyActor(replicationType))
proxy ! ProxyActor.ForwardToAllString("id1", MyReplicatedStringSet.Add("to-all"))
proxy ! ProxyActor.ForwardToRandomString("id1", MyReplicatedStringSet.Add("to-random"))
eventually {
val probe = createTestProbe[MyReplicatedStringSet.Texts]()
proxy ! ProxyActor.ForwardToAll("id1", MyReplicatedStringSet.GetTexts(probe.ref))
val responses: Seq[MyReplicatedStringSet.Texts] = probe.receiveMessages(3)
proxy ! ProxyActor.ForwardToAllString("id1", MyReplicatedStringSet.GetTexts(probe.ref))
val responses: Seq[MyReplicatedStringSet.Texts] = probe.receiveMessages(2)
val uniqueTexts = responses.flatMap(res => res.texts).toSet
uniqueTexts should ===(Set("to-all", "to-random"))
}
}
proxy ! ProxyActor.ForwardToAllInt("id1", MyReplicatedIntSet.Add(10))
proxy ! ProxyActor.ForwardToRandomInt("id1", MyReplicatedIntSet.Add(11))
eventually {
val probe = createTestProbe[MyReplicatedIntSet.Ints]()
proxy ! ProxyActor.ForwardToAllInt("id1", MyReplicatedIntSet.GetInts(probe.ref))
val responses: Seq[MyReplicatedIntSet.Ints] = probe.receiveMessages(2)
val uniqueTexts = responses.flatMap(res => res.ints).toSet
uniqueTexts should ===(Set(10, 11))
}
}
}
}

View file

@ -0,0 +1,74 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.cluster.sharding.typed
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.cluster.sharding.typed.ReplicatedEntity
import akka.cluster.sharding.typed.ReplicatedEntityProvider
import akka.cluster.sharding.typed.ReplicatedSharding
import akka.cluster.sharding.typed.ReplicatedShardingExtension
import akka.cluster.sharding.typed.ShardingEnvelope
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.sharding.typed.scaladsl.EntityRef
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.ReplicationId
import com.github.ghik.silencer.silent
@silent("never used")
object ReplicatedShardingCompileOnlySpec {
sealed trait Command
val system: ActorSystem[_] = ???
object MyEventSourcedBehavior {
def apply(replicationId: ReplicationId): Behavior[Command] = ???
}
//#bootstrap
ReplicatedEntityProvider[Command, ShardingEnvelope[Command]](
"MyEntityType",
Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { (entityTypeKey, replicaId) =>
ReplicatedEntity(replicaId, Entity(entityTypeKey) { entityContext =>
// the sharding entity id contains the business entityId, entityType, and replica id
// which you'll need to create a ReplicatedEventSourcedBehavior
val replicationId = ReplicationId.fromString(entityContext.entityId)
MyEventSourcedBehavior(replicationId)
})
}
//#bootstrap
//#bootstrap-dc
ReplicatedEntityProvider[Command, ShardingEnvelope[Command]](
"MyEntityType",
Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { (entityTypeKey, replicaId) =>
ReplicatedEntity(replicaId, Entity(entityTypeKey) { entityContext =>
val replicationId = ReplicationId.fromString(entityContext.entityId)
MyEventSourcedBehavior(replicationId)
}.withDataCenter(replicaId.id))
}
//#bootstrap-dc
//#bootstrap-role
val provider = ReplicatedEntityProvider[Command, ShardingEnvelope[Command]](
"MyEntityType",
Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { (entityTypeKey, replicaId) =>
ReplicatedEntity(replicaId, Entity(entityTypeKey) { entityContext =>
val replicationId = ReplicationId.fromString(entityContext.entityId)
MyEventSourcedBehavior(replicationId)
}.withRole(replicaId.id))
}
//#bootstrap-role
//#sending-messages
val myReplicatedSharding: ReplicatedSharding[Command, ShardingEnvelope[Command]] =
ReplicatedShardingExtension(system).init(provider)
val entityRefs: Map[ReplicaId, EntityRef[Command]] = myReplicatedSharding.entityRefsFor("myEntityId")
val actorRefs: Map[ReplicaId, ActorRef[ShardingEnvelope[Command]]] = myReplicatedSharding.shardingRefs
//#sending-messages
}

View file

@ -632,7 +632,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
case null =>
proxies.get(proxyName(typeName, None)) match {
case null =>
throw new IllegalStateException(s"Shard type [$typeName] must be started first")
throw new IllegalStateException(
s"Shard type [$typeName] must be started first. Started ${regions.keySet()} proxies ${proxies.keySet()}")
case ref => ref
}
case ref => ref

View file

@ -41,9 +41,9 @@ class Member private[cluster] (
}
override def toString =
if (dataCenter == ClusterSettings.DefaultDataCenter)
s"Member(address = $address, status = $status)"
s"Member(address = $address, status = $status, roles = $roles)"
else
s"Member(address = $address, dataCenter = $dataCenter, status = $status)"
s"Member(address = $address, dataCenter = $dataCenter, status = $status, roles = $roles)"
def hasRole(role: String): Boolean = roles.contains(role)

View file

@ -298,27 +298,50 @@ When comparing two version vectors `v1` and `v2`:
## Sharded Replicated Event Sourced entities
To simplify what probably are the most common use cases for how you will want to distribute the replicated actors there is a minimal API for running multiple instances of @ref[Akka Cluster Sharding](cluster-sharding.md),
each instance holding the entities for a single replica.
There are three ways to integrate replicated event sourced entities with sharding:
The distribution of the replicas can be controlled either through cluster roles or using the @ref[multi datacenter](cluster-dc.md) support in Akka Cluster.
* Ensure that each replica has a unique entity id by using the replica id as part of the entity id
* Use @ref[multi datacenter](cluster-dc.md) to run a full copy of sharding per replica
* Use roles to run a full copy of sharding per replica
The API consists of bootstrapping logic for starting the sharding instances through @apidoc[ReplicatedShardingExtension] available from the
To simplify all three cases the @apidoc[ReplicatedShardingExtension] is available from the
`akka-cluster-sharding-typed` module.
Scala
: @@snip [ReplicatedShardingSpec.scala](/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala) { #bootstrap }
: @@snip [ReplicatedShardingSpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.scala) { #bootstrap }
Java
: @@snip [ReplicatedShardingTest.java](/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java) { #bootstrap }
: @@snip [ReplicatedShardingTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.java) { #bootstrap }
This will run a single instance of sharding and the replicas will be differentiated by having the replica id in the sharding entity id.
Replicas could be on the same node if they end up in the same shard or if the shards get allocated to the same node.
To prevent this roles can be used. You could for instance add a cluster role per availability zone / rack and have a replica per rack.
Scala
: @@snip [ReplicatedShardingSpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.scala) { #bootstrap-role }
Java
: @@snip [ReplicatedShardingTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.java) { #bootstrap-role }
Lastly if your Akka Cluster is setup across DCs you can run a replica per DC.
Scala
: @@snip [ReplicatedShardingSpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.scala) { #bootstrap-dc }
Java
: @@snip [ReplicatedShardingTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.java) { #bootstrap-dc }
Regardless of which replication strategy you use sending messages to the replicated entities is the same.
`init` returns an @apidoc[ReplicatedSharding] instance which gives access to @apidoc[EntityRef]s for each of the replicas for arbitrary routing logic:
Scala
: @@snip [ReplicatedShardingSpec.scala](/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala) { #all-entity-refs }
: @@snip [ReplicatedShardingSpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.scala) { #sending-messages }
Java
: @@snip [ReplicatedShardingTest.java](/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java) { #all-entity-refs }
: @@snip [ReplicatedShardingTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.java) { #sending-messages }
More advanced routing among the replicas is currently left as an exercise for the reader (or may be covered in a future release [#29281](https://github.com/akka/akka/issues/29281), [#29319](https://github.com/akka/akka/issues/29319)).

View file

@ -81,9 +81,7 @@ public class ReplicatedEventSourcingTest extends JUnitSuite {
public static Behavior<Command> create(
String entityId, ReplicaId replicaId, Set<ReplicaId> allReplicas) {
return ReplicatedEventSourcing.withSharedJournal(
"ReplicatedEventSourcingTest",
entityId,
replicaId,
new ReplicationId("ReplicatedEventSourcingTest", entityId, replicaId),
allReplicas,
PersistenceTestKitReadJournal.Identifier(),
TestBehavior::new);

View file

@ -6,6 +6,7 @@ package jdocs.akka.persistence.typed;
import akka.actor.typed.Behavior;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.ReplicationId;
import akka.persistence.typed.javadsl.*;
import java.util.*;
@ -33,9 +34,7 @@ public class MyReplicatedBehavior
public static Behavior<Command> create(
String entityId, ReplicaId replicaId, String queryPluginId) {
return ReplicatedEventSourcing.withSharedJournal(
"MyReplicatedEntity",
entityId,
replicaId,
new ReplicationId("MyReplicatedEntity", entityId, replicaId),
ALL_REPLICAS,
queryPluginId,
MyReplicatedBehavior::new);
@ -49,9 +48,7 @@ public class MyReplicatedBehavior
allReplicasAndQueryPlugins.put(DCB, "journalForDCB");
return ReplicatedEventSourcing.create(
"MyReplicatedEntity",
entityId,
replicaId,
new ReplicationId("MyReplicatedEntity", entityId, replicaId),
allReplicasAndQueryPlugins,
MyReplicatedBehavior::new);
}

View file

@ -16,6 +16,7 @@ import akka.persistence.testkit.PersistenceTestKitPlugin;
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal;
import akka.persistence.typed.RecoveryCompleted;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.ReplicationId;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.CommandHandlerBuilder;
import akka.persistence.typed.javadsl.EventHandler;
@ -262,9 +263,7 @@ class AuctionEntity extends ReplicatedEventSourcedBehavior<Command, Event, Aucti
Behaviors.withTimers(
timers ->
ReplicatedEventSourcing.withSharedJournal(
"Auction",
name,
replica,
new ReplicationId("Auction", name, replica),
ALL_REPLICAS,
PersistenceTestKitReadJournal.Identifier(),
replicationCtx ->

View file

@ -11,6 +11,7 @@ import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.ReplicationId;
import akka.persistence.typed.crdt.LwwTime;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.Effect;
@ -187,9 +188,7 @@ interface ReplicatedBlogExample {
return Behaviors.setup(
context ->
ReplicatedEventSourcing.withSharedJournal(
"blog",
entityId,
replicaId,
new ReplicationId("blog", entityId, replicaId),
allReplicas,
PersistenceTestKitReadJournal.Identifier(),
replicationContext -> new BlogEntity(context, replicationContext)));

View file

@ -8,6 +8,7 @@ import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.ReplicationId;
import akka.persistence.typed.crdt.ORSet;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.EventHandler;
@ -61,9 +62,7 @@ interface ReplicatedMovieExample {
public static Behavior<Command> create(
String entityId, ReplicaId replicaId, Set<ReplicaId> allReplicas) {
return ReplicatedEventSourcing.withSharedJournal(
"movies",
entityId,
replicaId,
new ReplicationId("movies", entityId, replicaId),
allReplicas,
PersistenceTestKitReadJournal.Identifier(),
MovieWatchList::new);

View file

@ -8,6 +8,7 @@ import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.ReplicationId;
import akka.persistence.typed.crdt.Counter;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.Effect;
@ -85,9 +86,7 @@ interface ReplicatedShoppingCartExample {
public static Behavior<Command> create(
String entityId, ReplicaId replicaId, Set<ReplicaId> allReplicas) {
return ReplicatedEventSourcing.withSharedJournal(
"blog",
entityId,
replicaId,
new ReplicationId("blog", entityId, replicaId),
allReplicas,
PersistenceTestKitReadJournal.Identifier(),
ShoppingCart::new);

View file

@ -7,6 +7,7 @@ package jdocs.akka.persistence.typed;
import akka.actor.typed.Behavior;
import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.ReplicationId;
import akka.persistence.typed.javadsl.*;
import java.util.HashSet;
@ -27,9 +28,7 @@ public final class ReplicatedStringSet
public static Behavior<Command> create(
String entityId, ReplicaId replicaId, Set<ReplicaId> allReplicas) {
return ReplicatedEventSourcing.withSharedJournal(
"StringSet",
entityId,
replicaId,
new ReplicationId("StringSet", entityId, replicaId),
allReplicas,
PersistenceTestKitReadJournal.Identifier(),
ReplicatedStringSet::new);

View file

@ -33,9 +33,7 @@ object MultiJournalReplicationSpec {
private val writeJournalPerReplica = Map("R1" -> "journal1.journal", "R2" -> "journal2.journal")
def apply(entityId: String, replicaId: String): Behavior[Command] = {
ReplicatedEventSourcing(
"MultiJournalSpec",
entityId,
ReplicaId(replicaId),
ReplicationId("MultiJournalSpec", entityId, ReplicaId(replicaId)),
Map(ReplicaId("R1") -> "journal1.query", ReplicaId("R2") -> "journal2.query"))(
replicationContext =>
EventSourcedBehavior[Command, String, Set[String]](
@ -100,11 +98,17 @@ class MultiJournalReplicationSpec
val readJournal2 = PersistenceQuery(system).readJournalFor[CurrentEventsByPersistenceIdQuery]("journal2.query")
val eventsForJournal1 =
readJournal1.currentEventsByPersistenceId("id1|R1", 0L, Long.MaxValue).runWith(Sink.seq).futureValue
readJournal1
.currentEventsByPersistenceId("MultiJournalSpec|id1|R1", 0L, Long.MaxValue)
.runWith(Sink.seq)
.futureValue
eventsForJournal1.map(_.event).toSet should ===(Set("r1 m1", "r2 m1"))
val eventsForJournal2 =
readJournal2.currentEventsByPersistenceId("id1|R2", 0L, Long.MaxValue).runWith(Sink.seq).futureValue
readJournal2
.currentEventsByPersistenceId("MultiJournalSpec|id1|R2", 0L, Long.MaxValue)
.runWith(Sink.seq)
.futureValue
eventsForJournal2.map(_.event).toSet should ===(Set("r1 m1", "r2 m1"))
}

View file

@ -31,9 +31,7 @@ object ReplicatedEventPublishingSpec {
def apply(entityId: String, replicaId: ReplicaId, allReplicas: Set[ReplicaId]): Behavior[Command] =
Behaviors.setup { ctx =>
ReplicatedEventSourcing.withSharedJournal(
EntityType,
entityId,
replicaId,
ReplicationId(EntityType, entityId, replicaId),
allReplicas,
PersistenceTestKitReadJournal.Identifier)(
replicationContext =>
@ -86,7 +84,7 @@ class ReplicatedEventPublishingSpec
// simulate a published event from another replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
PersistenceId.replicatedId(EntityType, id, DCB),
ReplicationId(EntityType, id, DCB).persistenceId,
1L,
"two",
System.currentTimeMillis(),
@ -107,7 +105,7 @@ class ReplicatedEventPublishingSpec
// simulate a published event from another replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
PersistenceId.replicatedId(EntityType, id, DCB),
ReplicationId(EntityType, id, DCB).persistenceId,
2L, // missing 1L
"two",
System.currentTimeMillis(),
@ -128,7 +126,7 @@ class ReplicatedEventPublishingSpec
// simulate a published event from another replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
PersistenceId.replicatedId(EntityType, id, DCC),
ReplicationId(EntityType, id, DCC).persistenceId,
1L,
"two",
System.currentTimeMillis(),
@ -149,14 +147,14 @@ class ReplicatedEventPublishingSpec
// simulate a published event from another replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
PersistenceId.replicatedId(EntityType, "myId4", DCB),
ReplicationId(EntityType, "myId4", DCB).persistenceId,
1L,
"two",
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
// simulate another published event from that replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
PersistenceId.replicatedId(EntityType, id, DCB),
ReplicationId(EntityType, id, DCB).persistenceId,
1L,
"two-again", // ofc this would be the same in the real world, different just so we can detect
System.currentTimeMillis(),
@ -188,7 +186,7 @@ class ReplicatedEventPublishingSpec
// simulate a published event from another replica
incarnation2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
PersistenceId.replicatedId(EntityType, id, DCB),
ReplicationId(EntityType, id, DCB).persistenceId,
1L,
"two",
System.currentTimeMillis(),
@ -211,7 +209,7 @@ class ReplicatedEventPublishingSpec
// simulate a published event from another replica
incarnationA1.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
PersistenceId.replicatedId(EntityType, id, DCB),
ReplicationId(EntityType, id, DCB).persistenceId,
1L,
"two",
System.currentTimeMillis(),
@ -224,7 +222,7 @@ class ReplicatedEventPublishingSpec
// simulate a published event from another replica
incarnationA2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
PersistenceId.replicatedId(EntityType, id, DCB),
ReplicationId(EntityType, id, DCB).persistenceId,
2L,
"three",
System.currentTimeMillis(),

View file

@ -72,9 +72,7 @@ object ReplicatedEventSourcingSpec {
replicaId: String,
probe: Option[ActorRef[EventAndContext]] = None): Behavior[Command] =
ReplicatedEventSourcing.withSharedJournal(
"ReplicatedEventSourcingSpec",
entityId,
ReplicaId(replicaId),
ReplicationId("ReplicatedEventSourcingSpec", entityId, ReplicaId(replicaId)),
AllReplicas,
PersistenceTestKitReadJournal.Identifier)(replicationContext => eventSourcedBehavior(replicationContext, probe))

View file

@ -42,7 +42,10 @@ object ReplicatedEventSourcingTaggingSpec {
replica: ReplicaId,
allReplicas: Set[ReplicaId]): EventSourcedBehavior[Command, String, State] = {
// #tagging
ReplicatedEventSourcing.withSharedJournal("TaggingSpec", entityId, replica, allReplicas, queryPluginId)(
ReplicatedEventSourcing.withSharedJournal(
ReplicationId("TaggingSpec", entityId, replica),
allReplicas,
queryPluginId)(
replicationContext =>
EventSourcedBehavior[Command, String, State](
replicationContext.persistenceId,

View file

@ -29,9 +29,7 @@ object ReplicationIllegalAccessSpec {
def apply(entityId: String, replica: ReplicaId): Behavior[Command] = {
ReplicatedEventSourcing.withSharedJournal(
"IllegalAccessSpec",
entityId,
replica,
ReplicationId("IllegalAccessSpec", entityId, replica),
AllReplicas,
PersistenceTestKitReadJournal.Identifier)(
replicationContext =>
@ -91,9 +89,7 @@ class ReplicationIllegalAccessSpec
"detect illegal access in the factory" in {
val exception = intercept[UnsupportedOperationException] {
ReplicatedEventSourcing.withSharedJournal(
"IllegalAccessSpec",
"id2",
R1,
ReplicationId("IllegalAccessSpec", "id2", R1),
AllReplicas,
PersistenceTestKitReadJournal.Identifier) { replicationContext =>
replicationContext.origin

View file

@ -20,7 +20,7 @@ object ReplicationSnapshotSpec {
import ReplicatedEventSourcingSpec._
val EntityType = "SpapsnotSpec"
val EntityType = "SnapshotSpec"
def behaviorWithSnapshotting(entityId: String, replicaId: ReplicaId): Behavior[Command] =
behaviorWithSnapshotting(entityId, replicaId, None)
@ -36,9 +36,7 @@ object ReplicationSnapshotSpec {
replicaId: ReplicaId,
probe: Option[ActorRef[EventAndContext]]): Behavior[Command] = {
ReplicatedEventSourcing.withSharedJournal(
EntityType,
entityId,
replicaId,
ReplicationId(EntityType, entityId, replicaId),
AllReplicas,
PersistenceTestKitReadJournal.Identifier)(replicationContext =>
eventSourcedBehavior(replicationContext, probe).snapshotWhen((_, _, sequenceNr) => sequenceNr % 2 == 0))
@ -67,8 +65,8 @@ class ReplicationSnapshotSpec
"ReplicatedEventSourcing" should {
"recover state from snapshots" in {
val entityId = nextEntityId
val persistenceIdR1 = s"$entityId|R1"
val persistenceIdR2 = s"$entityId|R2"
val persistenceIdR1 = s"$EntityType|$entityId|R1"
val persistenceIdR2 = s"$EntityType|$entityId|R2"
val probe = createTestProbe[Done]()
val r2EventProbe = createTestProbe[EventAndContext]()
@ -84,7 +82,7 @@ class ReplicationSnapshotSpec
snapshotTestKit.expectNextPersisted(persistenceIdR2, State(List("r1 2", "r1 1")))
r2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
PersistenceId.replicatedId(EntityType, entityId, R1),
ReplicationId(EntityType, entityId, R1).persistenceId,
1L,
"two-again",
System.currentTimeMillis(),
@ -98,7 +96,7 @@ class ReplicationSnapshotSpec
{
val r2 = spawn(behaviorWithSnapshotting(entityId, R2, r2EventProbe.ref))
r2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
PersistenceId.replicatedId(EntityType, entityId, R1),
ReplicationId(EntityType, entityId, R1).persistenceId,
1L,
"two-again",
System.currentTimeMillis(),
@ -109,8 +107,6 @@ class ReplicationSnapshotSpec
r2 ! GetState(stateProbe.ref)
stateProbe.expectMessage(State(List("r1 2", "r1 1")))
}
}
}
}

View file

@ -7,6 +7,7 @@ package akka.persistence.typed.crdt
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.ReplicationId
import akka.persistence.typed.crdt.CounterSpec.PlainCounter.{ Decrement, Get, Increment }
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplicatedEventSourcing }
import akka.persistence.typed.{ ReplicaId, ReplicationBaseSpec }
@ -29,9 +30,7 @@ object CounterSpec {
eventProbe: Option[ActorRef[Counter.Updated]] = None) =
Behaviors.setup[PlainCounter.Command] { context =>
ReplicatedEventSourcing.withSharedJournal(
"CounterSpec",
entityId,
replicaId,
ReplicationId("CounterSpec", entityId, replicaId),
AllReplicas,
PersistenceTestKitReadJournal.Identifier) { ctx =>
EventSourcedBehavior[PlainCounter.Command, Counter.Updated, Counter](

View file

@ -6,6 +6,7 @@ package akka.persistence.typed.crdt
import akka.actor.typed.{ ActorRef, Behavior }
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.ReplicationId
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplicatedEventSourcing }
import akka.persistence.typed.{ ReplicaId, ReplicationBaseSpec }
import akka.serialization.jackson.CborSerializable
@ -27,9 +28,7 @@ object LwwSpec {
def apply(entityId: String, replica: ReplicaId): Behavior[Command] = {
ReplicatedEventSourcing.withSharedJournal(
"LwwRegistrySpec",
entityId,
replica,
ReplicationId("LwwRegistrySpec", entityId, replica),
AllReplicas,
PersistenceTestKitReadJournal.Identifier) { replicationContext =>
EventSourcedBehavior[Command, Event, Registry](

View file

@ -10,6 +10,7 @@ import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, Replicate
import akka.persistence.typed.{ ReplicaId, ReplicationBaseSpec }
import ORSetSpec.ORSetEntity._
import akka.persistence.typed.ReplicationBaseSpec.{ R1, R2 }
import akka.persistence.typed.ReplicationId
import akka.persistence.typed.crdt.ORSetSpec.ORSetEntity
import scala.util.Random
@ -28,9 +29,7 @@ object ORSetSpec {
def apply(entityId: String, replica: ReplicaId): Behavior[ORSetEntity.Command] = {
ReplicatedEventSourcing.withSharedJournal(
"ORSetSpec",
entityId,
replica,
ReplicationId("ORSetSpec", entityId, replica),
AllReplicas,
PersistenceTestKitReadJournal.Identifier) { replicationContext =>
EventSourcedBehavior[Command, ORSet.DeltaOp, ORSet[String]](

View file

@ -23,6 +23,7 @@ import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.ReplicationId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.ReplicatedEventSourcing
@ -142,12 +143,13 @@ object ReplicatedAuctionExampleSpec {
responsibleForClosing: Boolean,
allReplicas: Set[ReplicaId]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
Behaviors.withTimers { timers =>
ReplicatedEventSourcing
.withSharedJournal("auction", name, replica, allReplicas, PersistenceTestKitReadJournal.Identifier) {
replicationCtx =>
new AuctionEntity(ctx, replicationCtx, timers, closingAt, responsibleForClosing, allReplicas)
.behavior(initialBid)
}
ReplicatedEventSourcing.withSharedJournal(
ReplicationId("auction", name, replica),
allReplicas,
PersistenceTestKitReadJournal.Identifier) { replicationCtx =>
new AuctionEntity(ctx, replicationCtx, timers, closingAt, responsibleForClosing, allReplicas)
.behavior(initialBid)
}
}
}

View file

@ -16,6 +16,7 @@ import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.ReplicationId
import akka.persistence.typed.crdt.LwwTime
import akka.persistence.typed.scaladsl._
import akka.serialization.jackson.CborSerializable
@ -52,9 +53,7 @@ object ReplicatedBlogExampleSpec {
def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = {
Behaviors.setup[Command] { ctx =>
ReplicatedEventSourcing.withSharedJournal(
"blog",
entityId,
replicaId,
ReplicationId("blog", entityId, replicaId),
allReplicaIds,
PersistenceTestKitReadJournal.Identifier) { replicationContext =>
EventSourcedBehavior[Command, Event, BlogState](

View file

@ -5,6 +5,7 @@
package docs.akka.persistence.typed
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.ReplicationId
import akka.persistence.typed.scaladsl.{ EventSourcedBehavior, ReplicatedEventSourcing }
import com.github.ghik.silencer.silent
@ -24,15 +25,19 @@ object ReplicatedEventSourcingCompileOnlySpec {
trait Event
//#factory-shared
ReplicatedEventSourcing.withSharedJournal("entityTypeHint", "entityId", DCA, AllReplicas, queryPluginId) { context =>
ReplicatedEventSourcing.withSharedJournal(
ReplicationId("entityTypeHint", "entityId", DCA),
AllReplicas,
queryPluginId) { context =>
EventSourcedBehavior[Command, State, Event](???, ???, ???, ???)
}
//#factory-shared
//#factory
ReplicatedEventSourcing("entityTypeHint", "entityId", DCA, Map(DCA -> "journalForDCA", DCB -> "journalForDCB")) {
context =>
EventSourcedBehavior[Command, State, Event](???, ???, ???, ???)
ReplicatedEventSourcing(
ReplicationId("entityTypeHint", "entityId", DCA),
Map(DCA -> "journalForDCA", DCB -> "journalForDCB")) { context =>
EventSourcedBehavior[Command, State, Event](???, ???, ???, ???)
}
//#factory

View file

@ -5,7 +5,6 @@
package docs.akka.persistence.typed
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
@ -13,6 +12,7 @@ import akka.actor.typed.Behavior
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.ReplicationId
import akka.persistence.typed.crdt.ORSet
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
@ -29,9 +29,7 @@ object ReplicatedMovieWatchListExampleSpec {
def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = {
ReplicatedEventSourcing.withSharedJournal(
"movies",
entityId,
replicaId,
ReplicationId("movies", entityId, replicaId),
allReplicaIds,
PersistenceTestKitReadJournal.Identifier) { replicationContext =>
EventSourcedBehavior[Command, ORSet.DeltaOp, ORSet[String]](

View file

@ -8,7 +8,6 @@ import java.util.UUID
import docs.akka.persistence.typed.ReplicatedShoppingCartExampleSpec.ShoppingCart.CartItems
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
@ -16,6 +15,7 @@ import akka.actor.typed.Behavior
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.ReplicationId
import akka.persistence.typed.crdt.Counter
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
@ -42,9 +42,7 @@ object ReplicatedShoppingCartExampleSpec {
def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = {
ReplicatedEventSourcing.withSharedJournal(
"blog",
entityId,
replicaId,
ReplicationId("blog", entityId, replicaId),
allReplicaIds,
PersistenceTestKitReadJournal.Identifier) { replicationContext =>
EventSourcedBehavior[Command, Event, State](

View file

@ -3,7 +3,6 @@
*/
package akka.persistence.typed
import akka.annotation.ApiMayChange
object PersistenceId {
@ -125,27 +124,6 @@ object PersistenceId {
*/
def ofUniqueId(id: String): PersistenceId =
new PersistenceId(id)
/**
* Constructs a [[PersistenceId]] from the given `entityTypeHint`, `entityId` and `replicaId` by
* concatenating them with the `|` separator.
*/
@ApiMayChange
def replicatedId(entityTypeHint: String, entityId: String, replicaId: ReplicaId): PersistenceId = {
if (entityTypeHint.contains(DefaultSeparator))
throw new IllegalArgumentException(
s"entityTypeHint [$entityTypeHint] contains [$DefaultSeparator] which is a reserved character")
if (entityId.contains(DefaultSeparator))
throw new IllegalArgumentException(
s"entityId [$entityId] contains [$DefaultSeparator] which is a reserved character")
if (replicaId.id.contains(DefaultSeparator))
throw new IllegalArgumentException(
s"replicaId [${replicaId.id}] contains [$DefaultSeparator] which is a reserved character")
new PersistenceId(entityTypeHint + DefaultSeparator + entityId + DefaultSeparator + replicaId.id)
}
}
/**

View file

@ -0,0 +1,45 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed
object ReplicationId {
private[akka] val Separator = "|"
def fromString(id: String): ReplicationId = {
val split = id.split("\\|")
require(split.size == 3, s"invalid replication id $id")
ReplicationId(split(0), split(1), ReplicaId(split(2)))
}
/**
* @param typeName The name of the entity type e.g. account, user. Made part of the persistence id so that entity ids don't need to be unique across different replicated entities
* @param entityId The unique entity id
* @param replicaId The unique identity for this entity. The underlying persistence id will include the replica.
*/
def apply(typeName: String, entityId: String, replicaId: ReplicaId): ReplicationId =
new ReplicationId(typeName, entityId, replicaId)
}
/**
* @param typeName The name of the entity type e.g. account, user. Made part of the persistence id so that entity ids don't need to be unique across different replicated entities
* @param entityId The unique entity id
* @param replicaId The unique identity for this entity. The underlying persistence id will include the replica.
*/
final class ReplicationId(val typeName: String, val entityId: String, val replicaId: ReplicaId) {
import ReplicationId._
if (typeName.contains(Separator))
throw new IllegalArgumentException(
s"entityTypeHint [$typeName] contains [$Separator] which is a reserved character")
if (entityId.contains(Separator))
throw new IllegalArgumentException(s"entityId [$entityId] contains [$Separator] which is a reserved character")
if (replicaId.id.contains(Separator))
throw new IllegalArgumentException(
s"replicaId [${replicaId.id}] contains [$Separator] which is a reserved character")
private val id: String = s"$typeName$Separator$entityId$Separator${replicaId.id}"
def persistenceId: PersistenceId = PersistenceId.ofUniqueId(id)
}

View file

@ -257,7 +257,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
override private[akka] def withReplication(
context: ReplicationContextImpl): EventSourcedBehavior[Command, Event, State] = {
copy(replication = Some(ReplicationSetup(context.replicaId, context.replicasAndQueryPlugins, context)))
copy(
replication = Some(ReplicationSetup(context.replicationId.replicaId, context.replicasAndQueryPlugins, context)))
}
}

View file

@ -7,6 +7,7 @@ package akka.persistence.typed.internal
import akka.annotation.InternalApi
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.ReplicationId
import akka.util.OptionVal
import akka.util.WallClock
import akka.util.ccompat.JavaConverters._
@ -16,14 +17,11 @@ import akka.util.ccompat.JavaConverters._
*/
@InternalApi
private[akka] final class ReplicationContextImpl(
val entityTypeHint: String,
val entityId: String,
val replicaId: ReplicaId,
val replicationId: ReplicationId,
val replicasAndQueryPlugins: Map[ReplicaId, String])
extends akka.persistence.typed.scaladsl.ReplicationContext
with akka.persistence.typed.javadsl.ReplicationContext {
val allReplicas: Set[ReplicaId] = replicasAndQueryPlugins.keySet
// these are not volatile as they are set on the same thread as they should be accessed
var _currentThread: OptionVal[Thread] = OptionVal.None
var _origin: OptionVal[ReplicaId] = OptionVal.None
@ -65,7 +63,7 @@ private[akka] final class ReplicationContextImpl(
_concurrent
}
override def persistenceId: PersistenceId = PersistenceId.replicatedId(entityTypeHint, entityId, replicaId)
override def persistenceId: PersistenceId = replicationId.persistenceId
override def currentTimeMillis(): Long = {
WallClock.AlwaysIncreasingClock.currentTimeMillis()

View file

@ -35,6 +35,7 @@ import akka.persistence.journal.Tagged
import akka.persistence.query.{ EventEnvelope, PersistenceQuery }
import akka.persistence.query.scaladsl.EventsByPersistenceIdQuery
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.ReplicationId
import akka.persistence.typed.{
DeleteEventsCompleted,
DeleteEventsFailed,
@ -128,8 +129,8 @@ private[akka] object Running {
val query = PersistenceQuery(system)
replicationSetup.allReplicas.foldLeft(state) { (state, replicaId) =>
if (replicaId != replicationSetup.replicaId) {
val pid = PersistenceId.replicatedId(
replicationSetup.replicationContext.entityTypeHint,
val pid = ReplicationId(
replicationSetup.replicationContext.replicationId.typeName,
replicationSetup.replicationContext.entityId,
replicaId)
val queryPluginId = replicationSetup.allReplicasAndQueryPlugins(replicaId)
@ -147,7 +148,7 @@ private[akka] object Running {
Source.futureSource {
setup.context.self.ask[Long](replyTo => GetSeenSequenceNr(replicaId, replyTo)).map { seqNr =>
replication
.eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue)
.eventsByPersistenceId(pid.persistenceId.id, seqNr + 1, Long.MaxValue)
// from each replica, only get the events that originated there, this prevents most of the event filtering
// the downside is that events can't be received via other replicas in the event of an uneven network partition
.filter(event =>

View file

@ -11,8 +11,8 @@ import java.util.{ Map => JMap }
import akka.annotation.DoNotInherit
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.ReplicationId
import akka.persistence.typed.internal.ReplicationContextImpl
import akka.util.ccompat.JavaConverters._
/**
@ -23,6 +23,8 @@ import akka.util.ccompat.JavaConverters._
@DoNotInherit
trait ReplicationContext {
def replicationId: ReplicationId
/**
* @return The replica id of this replicated event sourced actor
*/
@ -81,25 +83,15 @@ object ReplicatedEventSourcing {
* can be used for each replica.
* The events from other replicas are read using PersistentQuery.
*
* @param entityType The name of the entity type e.g. account, user. Made part of the persistence id so that entity ids don't need to be unique across different replicated entities
* @param replicaId The unique identity for this entity. The underlying persistence id will include the replica.
* @param allReplicaIds All replica ids. These need to be known to receive events from all replicas.
* @param queryPluginId A single query plugin used to read the events from other replicas. Must be the query side of your configured journal plugin.
*/
def withSharedJournal[Command, Event, State](
entityType: String,
entityId: String,
replicaId: ReplicaId,
replicationId: ReplicationId,
allReplicaIds: JSet[ReplicaId],
queryPluginId: String,
behaviorFactory: JFunction[ReplicationContext, EventSourcedBehavior[Command, Event, State]])
: EventSourcedBehavior[Command, Event, State] =
create(
entityType,
entityId,
replicaId,
allReplicaIds.asScala.map(id => id -> queryPluginId).toMap.asJava,
behaviorFactory)
create(replicationId, allReplicaIds.asScala.map(id => id -> queryPluginId).toMap.asJava, behaviorFactory)
/**
* Initialize a replicated event sourced behavior.
@ -113,19 +105,15 @@ object ReplicatedEventSourcing {
* A query side identifier is passed per replica allowing for separate database/journal configuration per
* replica. The events from other replicas are read using PersistentQuery.
*
* @param entityType The name of the entity type e.g. account, user. Made part of the persistence id so that entity ids don't need to be unique across different replicated entities
* @param replicaId The unique identity for this entity. The underlying persistence id will include the replica.
* @param allReplicasAndQueryPlugins All replica ids and a query plugin per replica id. These need to be known to receive events from all replicas
* and configured with the query plugin for the journal that each replica uses.
*/
def create[Command, Event, State](
entityType: String,
entityId: String,
replicaId: ReplicaId,
replicationId: ReplicationId,
allReplicasAndQueryPlugins: JMap[ReplicaId, String],
eventSourcedBehaviorFactory: JFunction[ReplicationContext, EventSourcedBehavior[Command, Event, State]])
: EventSourcedBehavior[Command, Event, State] = {
val context = new ReplicationContextImpl(entityType, entityId, replicaId, allReplicasAndQueryPlugins.asScala.toMap)
val context = new ReplicationContextImpl(replicationId, allReplicasAndQueryPlugins.asScala.toMap)
eventSourcedBehaviorFactory(context)
}

View file

@ -7,6 +7,7 @@ package akka.persistence.typed.scaladsl
import akka.annotation.DoNotInherit
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.ReplicationId
import akka.persistence.typed.internal.ReplicationContextImpl
/**
@ -17,6 +18,8 @@ import akka.persistence.typed.internal.ReplicationContextImpl
@DoNotInherit
trait ReplicationContext {
def replicationId: ReplicationId
/**
* @return The unique id of this replica, including the replica id
*/
@ -25,7 +28,7 @@ trait ReplicationContext {
/**
* @return The replica id of this replicated event sourced actor
*/
def replicaId: ReplicaId
def replicaId: ReplicaId = replicationId.replicaId
/**
* @return The ids of all replicas of this replicated event sourced actor
@ -35,7 +38,7 @@ trait ReplicationContext {
/**
* @return The entity id of this replicated event sourced actor (not including the replica id)
*/
def entityId: String
def entityId: String = replicationId.entityId
/**
* Must only be called from the event handler
@ -76,22 +79,16 @@ object ReplicatedEventSourcing {
* can be used for each replica.
* The events from other replicas are read using PersistentQuery.
*
* @param entityType The name of the entity type e.g. account, user. Made part of the persistence id so that entity ids don't need to be unique across different replicated entities
* @param entityId The unique entity id
* @param replicaId The unique identity for this entity. The underlying persistence id will include the replica.
* @param allReplicaIds All replica ids. These need to be known to receive events from all replicas.
* @param queryPluginId A single query plugin used to read the events from other replicas. Must be the query side of your configured journal plugin.
*/
def withSharedJournal[Command, Event, State](
entityType: String,
entityId: String,
replicaId: ReplicaId,
replicationId: ReplicationId,
allReplicaIds: Set[ReplicaId],
queryPluginId: String)(
eventSourcedBehaviorFactory: ReplicationContext => EventSourcedBehavior[Command, Event, State])
: EventSourcedBehavior[Command, Event, State] =
apply(entityType, entityId, replicaId, allReplicaIds.map(id => id -> queryPluginId).toMap)(
eventSourcedBehaviorFactory)
apply(replicationId, allReplicaIds.map(id => id -> queryPluginId).toMap)(eventSourcedBehaviorFactory)
/**
* Initialize a replicated event sourced behavior.
@ -104,21 +101,13 @@ object ReplicatedEventSourcing {
* The journal plugin id for the entity itself can be configured using withJournalPluginId after creation.
* A query side identifier is passed per replica allowing for separate database/journal configuration per
* replica. The events from other replicas are read using PersistentQuery.
*
* @param entityType The name of the entity type e.g. account, user. Made part of the persistence id so that entity ids don't need to be unique across different replicated entities
* @param entityId The unique entity id
* @param replicaId The unique identity for this entity. The underlying persistence id will include the replica.
* @param allReplicasAndQueryPlugins All replica ids and a query plugin per replica id. These need to be known to receive events from all replicas
* and configured with the query plugin for the journal that each replica uses.
*/
def apply[Command, Event, State](
entityType: String,
entityId: String,
replicaId: ReplicaId,
allReplicasAndQueryPlugins: Map[ReplicaId, String])(
def apply[Command, Event, State](replicationId: ReplicationId, allReplicasAndQueryPlugins: Map[ReplicaId, String])(
eventSourcedBehaviorFactory: ReplicationContext => EventSourcedBehavior[Command, Event, State])
: EventSourcedBehavior[Command, Event, State] = {
val context = new ReplicationContextImpl(entityType, entityId, replicaId, allReplicasAndQueryPlugins)
val context = new ReplicationContextImpl(replicationId, allReplicasAndQueryPlugins)
eventSourcedBehaviorFactory(context).withReplication(context)
}