Make ShardingDirectReplication private (#29492)

* Make ShardingDirectReplication private

And provider user API to provide ReplicaId so that local messages don't
need to be forwarded to sharding to just be dropped

* Update docs to say direct replication is on my default

* Doh
This commit is contained in:
Christopher Batey 2020-08-14 15:02:38 +01:00
parent 260276fd90
commit eae102acb1
5 changed files with 32 additions and 52 deletions

View file

@ -46,6 +46,17 @@ trait ReplicatedShardingExtension extends Extension {
* Note, multiple calls on the same node will not start new sharding instances but will return a new instance of [[ReplicatedSharding]]
*/
def init[M, E](settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E]
/**
* Init one instance sharding per replica in the given settings and return a [[ReplicatedSharding]] representing those.
*
* @param thisReplica If provided saves messages being forwarded to sharding for this replica
* @tparam M The type of messages the replicated event sourced actor accepts
* @tparam E The type of envelope used for routing messages to actors, the same for all replicas
*
* Note, multiple calls on the same node will not start new sharding instances but will return a new instance of [[ReplicatedSharding]]
*/
def init[M, E](thisReplica: ReplicaId, settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E]
}
/**

View file

@ -9,15 +9,14 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.persistence.typed.PublishedEvent
import akka.persistence.typed.ReplicaId
import akka.util.ccompat.JavaConverters._
/**
* INTERNAL API
*
* Used when sharding Replicated Event Sourced entities in multiple instances of sharding, for example one per DC in a Multi DC
* Akka Cluster.
*
@ -35,8 +34,8 @@ import akka.util.ccompat.JavaConverters._
* The events are forwarded as [[akka.cluster.sharding.typed.ShardingEnvelope]] this will work out of the box both
* by default and with a custom extractor since the envelopes are handled internally.
*/
@ApiMayChange
object ShardingDirectReplication {
@InternalApi
private[akka] object ShardingDirectReplication {
/**
* Not for user extension
@ -52,42 +51,7 @@ object ShardingDirectReplication {
private final case class WrappedPublishedEvent(publishedEvent: PublishedEvent) extends Command
/**
* Java API:
* Factory for when the self replica id is unknown (or multiple)
* @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system
*/
def create[T](replicaShardingProxies: java.util.Map[ReplicaId, ActorRef[T]]): Behavior[Command] =
apply(None, replicaShardingProxies.asScala.toMap)
/**
* Java API:
* @param selfReplica The replica id of the replica that runs on this node
* @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system
*/
def create[T](
selfReplica: ReplicaId,
replicaShardingProxies: java.util.Map[ReplicaId, ActorRef[T]]): Behavior[Command] =
apply(Some(selfReplica), replicaShardingProxies.asScala.toMap)
/**
* Scala API:
* @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system
*/
def apply[T](replicaShardingProxies: Map[ReplicaId, ActorRef[T]]): Behavior[Command] =
apply(None, replicaShardingProxies)
/**
* Scala API:
* @param selfReplica The replica id of the replica that runs on this node
* @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system
*/
def apply[T](selfReplica: ReplicaId, replicaShardingProxies: Map[ReplicaId, ActorRef[T]]): Behavior[Command] =
apply(Some(selfReplica), replicaShardingProxies)
private def apply[T](
selfReplica: Option[ReplicaId],
replicaShardingProxies: Map[ReplicaId, ActorRef[T]]): Behavior[Command] =
def apply[T](selfReplica: Option[ReplicaId], replicaShardingProxies: Map[ReplicaId, ActorRef[T]]): Behavior[Command] =
Behaviors.setup[Command] { context =>
context.log.debug(
"Subscribing to event stream to forward events to [{}] sharded replicas",

View file

@ -34,7 +34,15 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_]
private val logger = LoggerFactory.getLogger(getClass)
override def init[M, E](settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] = {
override def init[M, E](settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] =
initInternal(None, settings)
override def init[M, E](thisReplica: ReplicaId, settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] =
initInternal(Some(thisReplica), settings)
private def initInternal[M, E](
thisReplica: Option[ReplicaId],
settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] = {
val sharding = ClusterSharding(system)
val initializedReplicas = settings.replicas.map {
case (replicaSettings, typeName) =>
@ -57,7 +65,7 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_]
if (settings.directReplication) {
logger.infoN("Starting Replicated Event Sourcing Direct Replication")
system.systemActorOf(
ShardingDirectReplication(replicaToRegionOrProxy),
ShardingDirectReplication(thisReplica, replicaToRegionOrProxy),
s"directReplication-${counter.incrementAndGet()}")
}

View file

@ -26,7 +26,7 @@ class ReplicatedShardingDirectReplicationSpec extends ScalaTestWithActorTestKit
val replicationActor = spawn(
ShardingDirectReplication(
typed.ReplicaId("ReplicaA"),
Some(typed.ReplicaId("ReplicaA")),
replicaShardingProxies = Map(
ReplicaId("ReplicaA") -> replicaAProbe.ref,
ReplicaId("ReplicaB") -> replicaBProbe.ref,

View file

@ -384,18 +384,15 @@ with a single stream of tagged events from all replicas without duplicates.
## Direct Replication of Events
Normally an event has to be written in the journal and then picked up by the trailing read journal in the other replicas.
As an optimization the replicated events can be published across the Akka cluster to the replicas. The read side
query is still needed as delivery is not guaranteed, but can be configured to poll the database less often since most
In addition to reading each replica's events from the database the replicated events are published across the Akka cluster to the replicas when used with Cluster Sharding.
The query is still needed as delivery is not guaranteed, but can be configured to poll the database less often since most
events will arrive at the replicas through the cluster.
To enable this feature you first need to enable event publishing on the @scala[`EventSourcedBehavior`]@java[`ReplicatedEventSourcedBehavior`] with `withEventPublishing`
and then enable direct replication through `withDirectReplication(true)` on @apidoc[ReplicatedEntityProvider] (if not using
replicated sharding the replication can be run standalone by starting the @apidoc[ShardingDirectReplication] actor).
This feature is enabled by default when using sharding.
To disable this feature you first need to disable event publishing on the @scala[`EventSourcedBehavior`]@java[`ReplicatedEventSourcedBehavior`] with `withEventPublishing`
and then disable direct replication through `withDirectReplication(true)` on @apidoc[ReplicatedEntityProvider]
The "event publishing" feature publishes each event to the local system event bus as a side effect after it has been written,
the @apidoc[ShardingDirectReplication] actor subscribes to these events and forwards them to the replicas allowing them
to fast forward the stream of events for the origin replica. (With additional potential future support in journals for fast forwarding [#29311](https://github.com/akka/akka/issues/29311)).
## Hot Standby