diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedShardingExtension.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedShardingExtension.scala index fb206cb73f..37654f6bb6 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedShardingExtension.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedShardingExtension.scala @@ -46,6 +46,17 @@ trait ReplicatedShardingExtension extends Extension { * Note, multiple calls on the same node will not start new sharding instances but will return a new instance of [[ReplicatedSharding]] */ def init[M, E](settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] + + /** + * Init one instance sharding per replica in the given settings and return a [[ReplicatedSharding]] representing those. + * + * @param thisReplica If provided saves messages being forwarded to sharding for this replica + * @tparam M The type of messages the replicated event sourced actor accepts + * @tparam E The type of envelope used for routing messages to actors, the same for all replicas + * + * Note, multiple calls on the same node will not start new sharding instances but will return a new instance of [[ReplicatedSharding]] + */ + def init[M, E](thisReplica: ReplicaId, settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] } /** diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingDirectReplication.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingDirectReplication.scala index 7904aa6cc9..47e228ecb7 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingDirectReplication.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingDirectReplication.scala @@ -9,15 +9,14 @@ import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.actor.typed.eventstream.EventStream import akka.actor.typed.scaladsl.Behaviors -import akka.annotation.ApiMayChange import akka.annotation.DoNotInherit import akka.annotation.InternalApi import akka.persistence.typed.PublishedEvent import akka.persistence.typed.ReplicaId -import akka.util.ccompat.JavaConverters._ - /** + * INTERNAL API + * * Used when sharding Replicated Event Sourced entities in multiple instances of sharding, for example one per DC in a Multi DC * Akka Cluster. * @@ -35,8 +34,8 @@ import akka.util.ccompat.JavaConverters._ * The events are forwarded as [[akka.cluster.sharding.typed.ShardingEnvelope]] this will work out of the box both * by default and with a custom extractor since the envelopes are handled internally. */ -@ApiMayChange -object ShardingDirectReplication { +@InternalApi +private[akka] object ShardingDirectReplication { /** * Not for user extension @@ -52,42 +51,7 @@ object ShardingDirectReplication { private final case class WrappedPublishedEvent(publishedEvent: PublishedEvent) extends Command - /** - * Java API: - * Factory for when the self replica id is unknown (or multiple) - * @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system - */ - def create[T](replicaShardingProxies: java.util.Map[ReplicaId, ActorRef[T]]): Behavior[Command] = - apply(None, replicaShardingProxies.asScala.toMap) - - /** - * Java API: - * @param selfReplica The replica id of the replica that runs on this node - * @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system - */ - def create[T]( - selfReplica: ReplicaId, - replicaShardingProxies: java.util.Map[ReplicaId, ActorRef[T]]): Behavior[Command] = - apply(Some(selfReplica), replicaShardingProxies.asScala.toMap) - - /** - * Scala API: - * @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system - */ - def apply[T](replicaShardingProxies: Map[ReplicaId, ActorRef[T]]): Behavior[Command] = - apply(None, replicaShardingProxies) - - /** - * Scala API: - * @param selfReplica The replica id of the replica that runs on this node - * @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system - */ - def apply[T](selfReplica: ReplicaId, replicaShardingProxies: Map[ReplicaId, ActorRef[T]]): Behavior[Command] = - apply(Some(selfReplica), replicaShardingProxies) - - private def apply[T]( - selfReplica: Option[ReplicaId], - replicaShardingProxies: Map[ReplicaId, ActorRef[T]]): Behavior[Command] = + def apply[T](selfReplica: Option[ReplicaId], replicaShardingProxies: Map[ReplicaId, ActorRef[T]]): Behavior[Command] = Behaviors.setup[Command] { context => context.log.debug( "Subscribing to event stream to forward events to [{}] sharded replicas", diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ReplicatedShardingExtensionImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ReplicatedShardingExtensionImpl.scala index f8f3e2383f..a9f218a0a3 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ReplicatedShardingExtensionImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ReplicatedShardingExtensionImpl.scala @@ -34,7 +34,15 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_] private val logger = LoggerFactory.getLogger(getClass) - override def init[M, E](settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] = { + override def init[M, E](settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] = + initInternal(None, settings) + + override def init[M, E](thisReplica: ReplicaId, settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] = + initInternal(Some(thisReplica), settings) + + private def initInternal[M, E]( + thisReplica: Option[ReplicaId], + settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] = { val sharding = ClusterSharding(system) val initializedReplicas = settings.replicas.map { case (replicaSettings, typeName) => @@ -57,7 +65,7 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_] if (settings.directReplication) { logger.infoN("Starting Replicated Event Sourcing Direct Replication") system.systemActorOf( - ShardingDirectReplication(replicaToRegionOrProxy), + ShardingDirectReplication(thisReplica, replicaToRegionOrProxy), s"directReplication-${counter.incrementAndGet()}") } diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingDirectReplicationSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingDirectReplicationSpec.scala index 550ca2e296..ae19cb35ad 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingDirectReplicationSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingDirectReplicationSpec.scala @@ -26,7 +26,7 @@ class ReplicatedShardingDirectReplicationSpec extends ScalaTestWithActorTestKit val replicationActor = spawn( ShardingDirectReplication( - typed.ReplicaId("ReplicaA"), + Some(typed.ReplicaId("ReplicaA")), replicaShardingProxies = Map( ReplicaId("ReplicaA") -> replicaAProbe.ref, ReplicaId("ReplicaB") -> replicaBProbe.ref, diff --git a/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md b/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md index 96760fd284..8a9413d935 100644 --- a/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md +++ b/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md @@ -384,18 +384,15 @@ with a single stream of tagged events from all replicas without duplicates. ## Direct Replication of Events -Normally an event has to be written in the journal and then picked up by the trailing read journal in the other replicas. -As an optimization the replicated events can be published across the Akka cluster to the replicas. The read side -query is still needed as delivery is not guaranteed, but can be configured to poll the database less often since most +In addition to reading each replica's events from the database the replicated events are published across the Akka cluster to the replicas when used with Cluster Sharding. +The query is still needed as delivery is not guaranteed, but can be configured to poll the database less often since most events will arrive at the replicas through the cluster. -To enable this feature you first need to enable event publishing on the @scala[`EventSourcedBehavior`]@java[`ReplicatedEventSourcedBehavior`] with `withEventPublishing` -and then enable direct replication through `withDirectReplication(true)` on @apidoc[ReplicatedEntityProvider] (if not using - replicated sharding the replication can be run standalone by starting the @apidoc[ShardingDirectReplication] actor). +This feature is enabled by default when using sharding. +To disable this feature you first need to disable event publishing on the @scala[`EventSourcedBehavior`]@java[`ReplicatedEventSourcedBehavior`] with `withEventPublishing` +and then disable direct replication through `withDirectReplication(true)` on @apidoc[ReplicatedEntityProvider] The "event publishing" feature publishes each event to the local system event bus as a side effect after it has been written, -the @apidoc[ShardingDirectReplication] actor subscribes to these events and forwards them to the replicas allowing them -to fast forward the stream of events for the origin replica. (With additional potential future support in journals for fast forwarding [#29311](https://github.com/akka/akka/issues/29311)). ## Hot Standby