diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplication.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplication.scala index fdb843962b..1c1315b459 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplication.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplication.scala @@ -46,6 +46,8 @@ object ActiveActiveShardingDirectReplication { @InternalApi private[akka] case class VerifyStarted(replyTo: ActorRef[Done]) extends Command + private final case class WrappedPublishedEvent(publishedEvent: PublishedEvent) extends Command + /** * Java API: * @param selfReplica The replica id of the replica that runs on this node @@ -60,31 +62,30 @@ object ActiveActiveShardingDirectReplication { * @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system */ def apply[T](selfReplica: String, replicaShardingProxies: Map[String, ActorRef[T]]): Behavior[Command] = - Behaviors - .setup[Any] { context => - context.log.debug( - "Subscribing to event stream to forward events to [{}] sharded replicas", - replicaShardingProxies.size - 1) - context.system.eventStream ! EventStream.Subscribe[PublishedEvent](context.self) + Behaviors.setup[Command] { context => + context.log.debug( + "Subscribing to event stream to forward events to [{}] sharded replicas", + replicaShardingProxies.size - 1) + val publishedEventAdapter = context.messageAdapter[PublishedEvent](WrappedPublishedEvent.apply) + context.system.eventStream ! EventStream.Subscribe[PublishedEvent](publishedEventAdapter) - Behaviors.receiveMessagePartial { - case event: PublishedEvent => - context.log.trace( - "Forwarding event for persistence id [{}] sequence nr [{}] to replicas", - event.persistenceId, - event.sequenceNumber) - replicaShardingProxies.foreach { - case (replica, proxy) => - val envelopedEvent = ShardingEnvelope(event.persistenceId.id, event) - if (replica != selfReplica) - proxy.asInstanceOf[ActorRef[ShardingEnvelope[PublishedEvent]]] ! envelopedEvent - } - Behaviors.same - case VerifyStarted(replyTo) => - replyTo ! Done - Behaviors.same - } + Behaviors.receiveMessage { + case WrappedPublishedEvent(event) => + context.log.trace( + "Forwarding event for persistence id [{}] sequence nr [{}] to replicas", + event.persistenceId, + event.sequenceNumber) + replicaShardingProxies.foreach { + case (replica, proxy) => + val envelopedEvent = ShardingEnvelope(event.persistenceId.id, event) + if (replica != selfReplica) + proxy.asInstanceOf[ActorRef[ShardingEnvelope[PublishedEvent]]] ! envelopedEvent + } + Behaviors.same + case VerifyStarted(replyTo) => + replyTo ! Done + Behaviors.same } - .narrow[Command] + } } diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplicationSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplicationSpec.scala index b9a939f387..79952ccf1a 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplicationSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplicationSpec.scala @@ -4,13 +4,15 @@ package akka.cluster.sharding.typed +import org.scalatest.wordspec.AnyWordSpecLike + import akka.Done import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.eventstream.EventStream import akka.persistence.typed.PersistenceId +import akka.persistence.typed.PublishedEvent import akka.persistence.typed.internal.PublishedEventImpl -import org.scalatest.wordspec.AnyWordSpecLike class ActiveActiveShardingDirectReplicationSpec extends ScalaTestWithActorTestKit @@ -20,9 +22,9 @@ class ActiveActiveShardingDirectReplicationSpec "Active active sharding replication" must { "replicate published events to all sharding proxies" in { - val replicaAProbe = createTestProbe[Any]() - val replicaBProbe = createTestProbe[Any]() - val replicaCProbe = createTestProbe[Any]() + val replicaAProbe = createTestProbe[ShardingEnvelope[PublishedEvent]]() + val replicaBProbe = createTestProbe[ShardingEnvelope[PublishedEvent]]() + val replicaCProbe = createTestProbe[ShardingEnvelope[PublishedEvent]]() val replicationActor = spawn( ActiveActiveShardingDirectReplication( @@ -42,8 +44,8 @@ class ActiveActiveShardingDirectReplicationSpec System.currentTimeMillis()) system.eventStream ! EventStream.Publish(event) - replicaBProbe.expectMessageType[ShardingEnvelope[_]].message should equal(event) - replicaCProbe.expectMessageType[ShardingEnvelope[_]].message should equal(event) + replicaBProbe.receiveMessage().message should equal(event) + replicaCProbe.receiveMessage().message should equal(event) replicaAProbe.expectNoMessage() // no publishing to the replica emitting it } diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/EventPublishingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/EventPublishingSpec.scala index a899beebc4..3f33a41381 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/EventPublishingSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/EventPublishingSpec.scala @@ -49,8 +49,8 @@ class EventPublishingSpec "EventPublishing support" must { "publish events after written for any actor" in { - val topicProbe = createTestProbe[Any]() - system.eventStream ! EventStream.Subscribe[PublishedEvent](topicProbe.ref.narrow) + val topicProbe = createTestProbe[PublishedEvent]() + system.eventStream ! EventStream.Subscribe(topicProbe.ref) // We don't verify subscription completed (no ack available), but expect the next steps to take enough time // for subscription to complete @@ -61,7 +61,7 @@ class EventPublishingSpec wowSuchActor ! WowSuchEventSourcingBehavior.StoreThis("great stuff", tagIt = false, replyTo = persistProbe.ref) persistProbe.expectMessage(Done) - val published1 = topicProbe.expectMessageType[PublishedEvent] + val published1 = topicProbe.receiveMessage() published1.persistenceId should ===(myId) published1.event should ===(WowSuchEventSourcingBehavior.Event("great stuff", false)) published1.sequenceNumber should ===(1L) @@ -72,7 +72,7 @@ class EventPublishingSpec anotherActor ! WowSuchEventSourcingBehavior.StoreThis("another event", tagIt = true, replyTo = persistProbe.ref) persistProbe.expectMessage(Done) - val published2 = topicProbe.expectMessageType[PublishedEvent] + val published2 = topicProbe.receiveMessage() published2.persistenceId should ===(anotherId) published2.event should ===(WowSuchEventSourcingBehavior.Event("another event", true)) published2.sequenceNumber should ===(1L)