message adapter in ActiveActiveShardingDirectReplication (#29328)

* I don't think it's worth cheating with Any and narrow just to save
  one or two allocations given how many other things that are needed for
  sending the messages
This commit is contained in:
Patrik Nordwall 2020-06-30 09:28:48 +02:00 committed by Christopher Batey
parent e79f5ac3c4
commit 238d55a413
3 changed files with 37 additions and 34 deletions

View file

@ -46,6 +46,8 @@ object ActiveActiveShardingDirectReplication {
@InternalApi
private[akka] case class VerifyStarted(replyTo: ActorRef[Done]) extends Command
private final case class WrappedPublishedEvent(publishedEvent: PublishedEvent) extends Command
/**
* Java API:
* @param selfReplica The replica id of the replica that runs on this node
@ -60,31 +62,30 @@ object ActiveActiveShardingDirectReplication {
* @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system
*/
def apply[T](selfReplica: String, replicaShardingProxies: Map[String, ActorRef[T]]): Behavior[Command] =
Behaviors
.setup[Any] { context =>
context.log.debug(
"Subscribing to event stream to forward events to [{}] sharded replicas",
replicaShardingProxies.size - 1)
context.system.eventStream ! EventStream.Subscribe[PublishedEvent](context.self)
Behaviors.setup[Command] { context =>
context.log.debug(
"Subscribing to event stream to forward events to [{}] sharded replicas",
replicaShardingProxies.size - 1)
val publishedEventAdapter = context.messageAdapter[PublishedEvent](WrappedPublishedEvent.apply)
context.system.eventStream ! EventStream.Subscribe[PublishedEvent](publishedEventAdapter)
Behaviors.receiveMessagePartial {
case event: PublishedEvent =>
context.log.trace(
"Forwarding event for persistence id [{}] sequence nr [{}] to replicas",
event.persistenceId,
event.sequenceNumber)
replicaShardingProxies.foreach {
case (replica, proxy) =>
val envelopedEvent = ShardingEnvelope(event.persistenceId.id, event)
if (replica != selfReplica)
proxy.asInstanceOf[ActorRef[ShardingEnvelope[PublishedEvent]]] ! envelopedEvent
}
Behaviors.same
case VerifyStarted(replyTo) =>
replyTo ! Done
Behaviors.same
}
Behaviors.receiveMessage {
case WrappedPublishedEvent(event) =>
context.log.trace(
"Forwarding event for persistence id [{}] sequence nr [{}] to replicas",
event.persistenceId,
event.sequenceNumber)
replicaShardingProxies.foreach {
case (replica, proxy) =>
val envelopedEvent = ShardingEnvelope(event.persistenceId.id, event)
if (replica != selfReplica)
proxy.asInstanceOf[ActorRef[ShardingEnvelope[PublishedEvent]]] ! envelopedEvent
}
Behaviors.same
case VerifyStarted(replyTo) =>
replyTo ! Done
Behaviors.same
}
.narrow[Command]
}
}

View file

@ -4,13 +4,15 @@
package akka.cluster.sharding.typed
import org.scalatest.wordspec.AnyWordSpecLike
import akka.Done
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.eventstream.EventStream
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.PublishedEvent
import akka.persistence.typed.internal.PublishedEventImpl
import org.scalatest.wordspec.AnyWordSpecLike
class ActiveActiveShardingDirectReplicationSpec
extends ScalaTestWithActorTestKit
@ -20,9 +22,9 @@ class ActiveActiveShardingDirectReplicationSpec
"Active active sharding replication" must {
"replicate published events to all sharding proxies" in {
val replicaAProbe = createTestProbe[Any]()
val replicaBProbe = createTestProbe[Any]()
val replicaCProbe = createTestProbe[Any]()
val replicaAProbe = createTestProbe[ShardingEnvelope[PublishedEvent]]()
val replicaBProbe = createTestProbe[ShardingEnvelope[PublishedEvent]]()
val replicaCProbe = createTestProbe[ShardingEnvelope[PublishedEvent]]()
val replicationActor = spawn(
ActiveActiveShardingDirectReplication(
@ -42,8 +44,8 @@ class ActiveActiveShardingDirectReplicationSpec
System.currentTimeMillis())
system.eventStream ! EventStream.Publish(event)
replicaBProbe.expectMessageType[ShardingEnvelope[_]].message should equal(event)
replicaCProbe.expectMessageType[ShardingEnvelope[_]].message should equal(event)
replicaBProbe.receiveMessage().message should equal(event)
replicaCProbe.receiveMessage().message should equal(event)
replicaAProbe.expectNoMessage() // no publishing to the replica emitting it
}

View file

@ -49,8 +49,8 @@ class EventPublishingSpec
"EventPublishing support" must {
"publish events after written for any actor" in {
val topicProbe = createTestProbe[Any]()
system.eventStream ! EventStream.Subscribe[PublishedEvent](topicProbe.ref.narrow)
val topicProbe = createTestProbe[PublishedEvent]()
system.eventStream ! EventStream.Subscribe(topicProbe.ref)
// We don't verify subscription completed (no ack available), but expect the next steps to take enough time
// for subscription to complete
@ -61,7 +61,7 @@ class EventPublishingSpec
wowSuchActor ! WowSuchEventSourcingBehavior.StoreThis("great stuff", tagIt = false, replyTo = persistProbe.ref)
persistProbe.expectMessage(Done)
val published1 = topicProbe.expectMessageType[PublishedEvent]
val published1 = topicProbe.receiveMessage()
published1.persistenceId should ===(myId)
published1.event should ===(WowSuchEventSourcingBehavior.Event("great stuff", false))
published1.sequenceNumber should ===(1L)
@ -72,7 +72,7 @@ class EventPublishingSpec
anotherActor ! WowSuchEventSourcingBehavior.StoreThis("another event", tagIt = true, replyTo = persistProbe.ref)
persistProbe.expectMessage(Done)
val published2 = topicProbe.expectMessageType[PublishedEvent]
val published2 = topicProbe.receiveMessage()
published2.persistenceId should ===(anotherId)
published2.event should ===(WowSuchEventSourcingBehavior.Event("another event", true))
published2.sequenceNumber should ===(1L)