Unwrap tagged replicated events before publishing to other replicas (#30136)
This commit is contained in:
parent
c14e8e6c44
commit
0326e11387
4 changed files with 16 additions and 1 deletions
|
|
@ -76,7 +76,9 @@ private[akka] object ShardingDirectReplication {
|
||||||
replicaShardingProxies.foreach {
|
replicaShardingProxies.foreach {
|
||||||
case (replica, proxy) =>
|
case (replica, proxy) =>
|
||||||
val newId = replicationId.withReplica(replica)
|
val newId = replicationId.withReplica(replica)
|
||||||
val envelopedEvent = ShardingEnvelope(newId.persistenceId.id, event)
|
// receiving side is responsible for any tagging, so drop/unwrap any tags added by the local tagger
|
||||||
|
val withoutTags = event.withoutTags
|
||||||
|
val envelopedEvent = ShardingEnvelope(newId.persistenceId.id, withoutTags)
|
||||||
if (!selfReplica.contains(replica)) {
|
if (!selfReplica.contains(replica)) {
|
||||||
proxy.asInstanceOf[ActorRef[ShardingEnvelope[PublishedEvent]]] ! envelopedEvent
|
proxy.asInstanceOf[ActorRef[ShardingEnvelope[PublishedEvent]]] ! envelopedEvent
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,2 @@
|
||||||
|
# not for user extension
|
||||||
|
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.PublishedEvent.withoutTags")
|
||||||
|
|
@ -30,4 +30,10 @@ trait PublishedEvent {
|
||||||
def event: Any
|
def event: Any
|
||||||
def timestamp: Long
|
def timestamp: Long
|
||||||
def tags: Set[String]
|
def tags: Set[String]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If the published event is tagged, return a new published event with the payload unwrapped and the tags dropped,
|
||||||
|
* if it is not tagged return the published event as is.
|
||||||
|
*/
|
||||||
|
def withoutTags: PublishedEvent
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -377,5 +377,10 @@ private[akka] final case class PublishedEventImpl(
|
||||||
case _ => payload
|
case _ => payload
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override def withoutTags: PublishedEvent = payload match {
|
||||||
|
case Tagged(event, _) => copy(payload = event)
|
||||||
|
case _ => this
|
||||||
|
}
|
||||||
|
|
||||||
override def getReplicatedMetaData: Optional[ReplicatedPublishedEventMetaData] = replicatedMetaData.asJava
|
override def getReplicatedMetaData: Optional[ReplicatedPublishedEventMetaData] = replicatedMetaData.asJava
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue