Fix for ReplicatedShardingSpec failing #29674

This commit is contained in:
Johan Andrén 2020-09-28 16:33:21 +02:00 committed by GitHub
parent 7a0e1a63d9
commit a05712dff7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 7 deletions

View file

@ -75,12 +75,17 @@ private[akka] object ShardingDirectReplication {
event.sequenceNumber) event.sequenceNumber)
replicaShardingProxies.foreach { replicaShardingProxies.foreach {
case (replica, proxy) => case (replica, proxy) =>
val newId = ReplicationId.fromString(event.persistenceId.id).withReplica(replica) val newId = replicationId.withReplica(replica)
val envelopedEvent = ShardingEnvelope(newId.persistenceId.id, event) val envelopedEvent = ShardingEnvelope(newId.persistenceId.id, event)
if (!selfReplica.contains(replica)) { if (!selfReplica.contains(replica)) {
proxy.asInstanceOf[ActorRef[ShardingEnvelope[PublishedEvent]]] ! envelopedEvent proxy.asInstanceOf[ActorRef[ShardingEnvelope[PublishedEvent]]] ! envelopedEvent
} }
} }
} else {
context.log.traceN(
"Not forwarding event for persistence id [{}] to replicas (wrong type name, expected [{}]).",
event.persistenceId,
typeName)
} }
} }
Behaviors.same Behaviors.same

View file

@ -43,13 +43,12 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_]
thisReplica: Option[ReplicaId], thisReplica: Option[ReplicaId],
settings: ReplicatedEntityProvider[M]): ReplicatedSharding[M] = { settings: ReplicatedEntityProvider[M]): ReplicatedSharding[M] = {
require(settings.replicas.nonEmpty, "Replicas must not be empty") require(settings.replicas.nonEmpty, "Replicas must not be empty")
val typeName = settings.replicas.head._1.entity.typeKey.name
val sharding = ClusterSharding(system) val sharding = ClusterSharding(system)
val initializedReplicas = settings.replicas.map { val initializedReplicas = settings.replicas.map {
case (replicaSettings, typeName) => case (replicaSettings, typeName) =>
// start up a sharding instance per replica id // start up a sharding instance per replica id
logger.infoN( logger.infoN(
"Starting Replicated Event Sourcing sharding for replica [{}] (ShardType: [{}])", "Starting Replicated Event Sourcing sharding for replica [{}] (ShardType: [{}], typeName [{}])",
replicaSettings.replicaId.id, replicaSettings.replicaId.id,
replicaSettings.entity.typeKey.name) replicaSettings.entity.typeKey.name)
val regionOrProxy = sharding.init(replicaSettings.entity) val regionOrProxy = sharding.init(replicaSettings.entity)
@ -60,13 +59,14 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_]
regionOrProxy, regionOrProxy,
replicaSettings.entity.dataCenter) replicaSettings.entity.dataCenter)
} }
if (settings.directReplication) {
val replicaToRegionOrProxy = initializedReplicas.map { val replicaToRegionOrProxy = initializedReplicas.map {
case (_, replicaId, _, regionOrProxy, _) => replicaId -> regionOrProxy case (_, replicaId, _, regionOrProxy, _) => replicaId -> regionOrProxy
}.toMap }.toMap
if (settings.directReplication) { val typeNameWithoutReplicaId = settings.replicas.head._2
logger.infoN("Starting Replicated Event Sourcing Direct Replication") logger.infoN("Starting Replicated Event Sourcing Direct Replication")
system.systemActorOf( system.systemActorOf(
ShardingDirectReplication(typeName, thisReplica, replicaToRegionOrProxy), ShardingDirectReplication(typeNameWithoutReplicaId, thisReplica, replicaToRegionOrProxy),
s"directReplication-${counter.incrementAndGet()}") s"directReplication-${counter.incrementAndGet()}")
} }