From 04e22c3687a3b6ae5b13e9c5360af5dd6f389393 Mon Sep 17 00:00:00 2001 From: Alexis Hernandez Date: Tue, 11 Jan 2022 04:31:51 -0700 Subject: [PATCH] Update ReplicatorMessageAdapter#subscribe (#31050) The method was calling `unsubscribe` twice. --- .../ddata/typed/javadsl/ReplicatorMessageAdapter.scala | 4 +--- .../ddata/typed/scaladsl/ReplicatorMessageAdapter.scala | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/ReplicatorMessageAdapter.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/ReplicatorMessageAdapter.scala index c2fb713931..6b21dc1858 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/ReplicatorMessageAdapter.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/ReplicatorMessageAdapter.scala @@ -68,9 +68,7 @@ class ReplicatorMessageAdapter[A, B <: ReplicatedData]( def subscribe(key: Key[B], responseAdapter: akka.japi.function.Function[Replicator.SubscribeResponse[B], A]): Unit = { // unsubscribe in case it's called more than once per key unsubscribe(key) - changedMessageAdapters.get(key).foreach { subscriber => - replicator ! Replicator.Unsubscribe(key, subscriber) - } + val replyTo: ActorRef[Replicator.SubscribeResponse[B]] = context.messageAdapter(classOf[Replicator.SubscribeResponse[B]], responseAdapter) changedMessageAdapters = changedMessageAdapters.updated(key, replyTo) diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorMessageAdapter.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorMessageAdapter.scala index 8b923cc998..d95dbb5115 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorMessageAdapter.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorMessageAdapter.scala @@ -71,9 +71,7 @@ class ReplicatorMessageAdapter[A, B <: ReplicatedData]( def subscribe(key: Key[B], responseAdapter: Replicator.SubscribeResponse[B] => A): Unit = { // unsubscribe in case it's called more than once per key unsubscribe(key) - changedMessageAdapters.get(key).foreach { subscriber => - replicator ! Replicator.Unsubscribe(key, subscriber) - } + val replyTo: ActorRef[Replicator.SubscribeResponse[B]] = context.messageAdapter[Replicator.SubscribeResponse[B]](responseAdapter) changedMessageAdapters = changedMessageAdapters.updated(key, replyTo)