diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingDirectReplication.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingDirectReplication.scala index 90f0482bba..723b746c3e 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingDirectReplication.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingDirectReplication.scala @@ -76,7 +76,9 @@ private[akka] object ShardingDirectReplication { replicaShardingProxies.foreach { case (replica, proxy) => val newId = replicationId.withReplica(replica) - val envelopedEvent = ShardingEnvelope(newId.persistenceId.id, event) + // receiving side is responsible for any tagging, so drop/unwrap any tags added by the local tagger + val withoutTags = event.withoutTags + val envelopedEvent = ShardingEnvelope(newId.persistenceId.id, withoutTags) if (!selfReplica.contains(replica)) { proxy.asInstanceOf[ActorRef[ShardingEnvelope[PublishedEvent]]] ! envelopedEvent } diff --git a/akka-persistence-typed/src/main/mima-filters/2.6.13.backwards.excludes/pr-30080-replicated-publishing-tagged-events.excludes b/akka-persistence-typed/src/main/mima-filters/2.6.13.backwards.excludes/pr-30080-replicated-publishing-tagged-events.excludes new file mode 100644 index 0000000000..3613f36af0 --- /dev/null +++ b/akka-persistence-typed/src/main/mima-filters/2.6.13.backwards.excludes/pr-30080-replicated-publishing-tagged-events.excludes @@ -0,0 +1,2 @@ +# not for user extension +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.PublishedEvent.withoutTags") diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/PublishedEvent.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/PublishedEvent.scala index 7a128913a3..b0adfa1583 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/PublishedEvent.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/PublishedEvent.scala @@ -30,4 +30,10 @@ trait PublishedEvent { def event: Any def timestamp: Long def tags: Set[String] + + /** + * If the published event is tagged, return a new published event with the payload unwrapped and the tags dropped, + * if it is not tagged return the published event as is. + */ + def withoutTags: PublishedEvent } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index f6d43f95df..636543678f 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -377,5 +377,10 @@ private[akka] final case class PublishedEventImpl( case _ => payload } + override def withoutTags: PublishedEvent = payload match { + case Tagged(event, _) => copy(payload = event) + case _ => this + } + override def getReplicatedMetaData: Optional[ReplicatedPublishedEventMetaData] = replicatedMetaData.asJava }