diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/ReplicatorMessageAdapter.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/ReplicatorMessageAdapter.scala index c2fb713931..6b21dc1858 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/ReplicatorMessageAdapter.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/ReplicatorMessageAdapter.scala @@ -68,9 +68,7 @@ class ReplicatorMessageAdapter[A, B <: ReplicatedData]( def subscribe(key: Key[B], responseAdapter: akka.japi.function.Function[Replicator.SubscribeResponse[B], A]): Unit = { // unsubscribe in case it's called more than once per key unsubscribe(key) - changedMessageAdapters.get(key).foreach { subscriber => - replicator ! Replicator.Unsubscribe(key, subscriber) - } + val replyTo: ActorRef[Replicator.SubscribeResponse[B]] = context.messageAdapter(classOf[Replicator.SubscribeResponse[B]], responseAdapter) changedMessageAdapters = changedMessageAdapters.updated(key, replyTo) diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorMessageAdapter.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorMessageAdapter.scala index 8b923cc998..d95dbb5115 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorMessageAdapter.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorMessageAdapter.scala @@ -71,9 +71,7 @@ class ReplicatorMessageAdapter[A, B <: ReplicatedData]( def subscribe(key: Key[B], responseAdapter: Replicator.SubscribeResponse[B] => A): Unit = { // unsubscribe in case it's called more than once per key unsubscribe(key) - changedMessageAdapters.get(key).foreach { subscriber => - replicator ! Replicator.Unsubscribe(key, subscriber) - } + val replyTo: ActorRef[Replicator.SubscribeResponse[B]] = context.messageAdapter[Replicator.SubscribeResponse[B]](responseAdapter) changedMessageAdapters = changedMessageAdapters.updated(key, replyTo)