diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingDirectReplication.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingDirectReplication.scala index 4223b0e53b..db7637b31e 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingDirectReplication.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingDirectReplication.scala @@ -75,12 +75,17 @@ private[akka] object ShardingDirectReplication { event.sequenceNumber) replicaShardingProxies.foreach { case (replica, proxy) => - val newId = ReplicationId.fromString(event.persistenceId.id).withReplica(replica) + val newId = replicationId.withReplica(replica) val envelopedEvent = ShardingEnvelope(newId.persistenceId.id, event) if (!selfReplica.contains(replica)) { proxy.asInstanceOf[ActorRef[ShardingEnvelope[PublishedEvent]]] ! envelopedEvent } } + } else { + context.log.traceN( + "Not forwarding event for persistence id [{}] to replicas (wrong type name, expected [{}]).", + event.persistenceId, + typeName) } } Behaviors.same diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ReplicatedShardingExtensionImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ReplicatedShardingExtensionImpl.scala index eef99463da..da37282ffd 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ReplicatedShardingExtensionImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ReplicatedShardingExtensionImpl.scala @@ -43,13 +43,12 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_] thisReplica: Option[ReplicaId], settings: ReplicatedEntityProvider[M]): ReplicatedSharding[M] = { require(settings.replicas.nonEmpty, "Replicas must not be empty") - val typeName = settings.replicas.head._1.entity.typeKey.name val sharding = ClusterSharding(system) val initializedReplicas = settings.replicas.map { case (replicaSettings, typeName) => // start up a sharding instance per replica id logger.infoN( - "Starting Replicated Event Sourcing sharding for replica [{}] (ShardType: [{}])", + "Starting Replicated Event Sourcing sharding for replica [{}] (ShardType: [{}], typeName [{}])", replicaSettings.replicaId.id, replicaSettings.entity.typeKey.name) val regionOrProxy = sharding.init(replicaSettings.entity) @@ -60,13 +59,14 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_] regionOrProxy, replicaSettings.entity.dataCenter) } - val replicaToRegionOrProxy = initializedReplicas.map { - case (_, replicaId, _, regionOrProxy, _) => replicaId -> regionOrProxy - }.toMap if (settings.directReplication) { + val replicaToRegionOrProxy = initializedReplicas.map { + case (_, replicaId, _, regionOrProxy, _) => replicaId -> regionOrProxy + }.toMap + val typeNameWithoutReplicaId = settings.replicas.head._2 logger.infoN("Starting Replicated Event Sourcing Direct Replication") system.systemActorOf( - ShardingDirectReplication(typeName, thisReplica, replicaToRegionOrProxy), + ShardingDirectReplication(typeNameWithoutReplicaId, thisReplica, replicaToRegionOrProxy), s"directReplication-${counter.incrementAndGet()}") }