diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/internal/ReplicatorBehavior.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/internal/ReplicatorBehavior.scala index 0d21ef68ae..b5daf9a5b6 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/internal/ReplicatorBehavior.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/internal/ReplicatorBehavior.scala @@ -48,11 +48,11 @@ import akka.actor.typed.Terminated def withState( subscribeAdapters: Map[ - ActorRef[JReplicator.Changed[ReplicatedData]], - ActorRef[dd.Replicator.Changed[ReplicatedData]]]): Behavior[SReplicator.Command] = { + ActorRef[JReplicator.SubscribeResponse[ReplicatedData]], + ActorRef[dd.Replicator.SubscribeResponse[ReplicatedData]]]): Behavior[SReplicator.Command] = { def stopSubscribeAdapter( - subscriber: ActorRef[JReplicator.Changed[ReplicatedData]]): Behavior[SReplicator.Command] = { + subscriber: ActorRef[JReplicator.SubscribeResponse[ReplicatedData]]): Behavior[SReplicator.Command] = { subscribeAdapters.get(subscriber) match { case Some(adapter) => // will be unsubscribed from classicReplicator via Terminated @@ -122,14 +122,14 @@ import akka.actor.typed.Terminated Behaviors.same case cmd: SReplicator.Subscribe[_] => - // For the Scala API the Changed messages can be sent directly to the subscriber + // For the Scala API the SubscribeResponse messages can be sent directly to the subscriber classicReplicator.tell( dd.Replicator.Subscribe(cmd.key, cmd.subscriber.toClassic), sender = cmd.subscriber.toClassic) Behaviors.same case cmd: JReplicator.Subscribe[ReplicatedData] @unchecked => - // For the Java API the Changed messages must be mapped to the JReplicator.Changed class. + // For the Java API the Changed/Deleted messages must be mapped to the JReplicator.Changed/Deleted class. // That is done with an adapter, and we have to keep track of the lifecycle of the original // subscriber and stop the adapter when the original subscriber is stopped. val adapter: ActorRef[dd.Replicator.SubscribeResponse[ReplicatedData]] = ctx.spawnMessageAdapter { @@ -152,6 +152,12 @@ import akka.actor.typed.Terminated } Behaviors.same + case cmd: SReplicator.Unsubscribe[_] => + classicReplicator.tell( + dd.Replicator.Unsubscribe(cmd.key, cmd.subscriber.toClassic), + sender = cmd.subscriber.toClassic) + Behaviors.same + case cmd: JReplicator.Unsubscribe[ReplicatedData] @unchecked => stopSubscribeAdapter(cmd.subscriber) @@ -202,7 +208,7 @@ import akka.actor.typed.Terminated } } .receiveSignal { - case (_, Terminated(ref: ActorRef[JReplicator.Changed[ReplicatedData]] @unchecked)) => + case (_, Terminated(ref: ActorRef[JReplicator.SubscribeResponse[ReplicatedData]] @unchecked)) => stopSubscribeAdapter(ref) } } diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/Replicator.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/Replicator.scala index f235eb8624..7e03569e51 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/Replicator.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/Replicator.scala @@ -250,14 +250,16 @@ object Replicator { * If the key is deleted the subscriber is notified with a [[Deleted]] * message. */ - final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command + final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[SubscribeResponse[A]]) + extends Command /** * Unregister a subscriber. * * @see [[Subscribe]] */ - final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command + final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[SubscribeResponse[A]]) + extends Command /** * @see [[Subscribe]] diff --git a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocSample.java b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocSample.java index 87360f9a58..5fce766193 100644 --- a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocSample.java +++ b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocSample.java @@ -45,6 +45,10 @@ interface ReplicatorDocSample { } } + enum Unsubscribe implements Command { + INSTANCE + } + private interface InternalCommand extends Command {} private static class InternalUpdateResponse implements InternalCommand { @@ -115,6 +119,7 @@ interface ReplicatorDocSample { .onMessage(InternalUpdateResponse.class, msg -> Behaviors.same()) .onMessage(GetValue.class, this::onGetValue) .onMessage(GetCachedValue.class, this::onGetCachedValue) + .onMessage(Unsubscribe.class, this::onUnsubscribe) .onMessage(InternalGetResponse.class, this::onInternalGetResponse) .onMessage(InternalSubscribeResponse.class, this::onInternalSubscribeResponse) .build(); @@ -147,6 +152,11 @@ interface ReplicatorDocSample { return this; } + private Behavior onUnsubscribe(Unsubscribe cmd) { + replicatorAdapter.unsubscribe(key); + return this; + } + private Behavior onInternalGetResponse(InternalGetResponse msg) { if (msg.rsp instanceof Replicator.GetSuccess) { int value = ((Replicator.GetSuccess) msg.rsp).get(key).getValue().intValue(); diff --git a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocTest.java b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocTest.java index 8f349cffbb..86da861bae 100644 --- a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocTest.java +++ b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocTest.java @@ -20,6 +20,8 @@ import org.junit.Rule; import org.junit.Test; import org.scalatest.junit.JUnitSuite; +import java.time.Duration; + import static jdocs.akka.cluster.ddata.typed.javadsl.ReplicatorDocSample.Counter; import static org.junit.Assert.assertEquals; @@ -48,7 +50,7 @@ public class ReplicatorDocTest extends JUnitSuite { } @Test - public void shouldHaveApiForSubscribe() { + public void shouldHaveApiForSubscribeAndUnsubscribe() { TestProbe probe = testKit.createTestProbe(Integer.class); ActorRef client = testKit.spawn(Counter.create(GCounterKey.create("counter2"))); @@ -69,6 +71,13 @@ public class ReplicatorDocTest extends JUnitSuite { probe.expectMessage(3); return null; }); + + client.tell(Counter.Unsubscribe.INSTANCE); + client.tell(Counter.Increment.INSTANCE); + // wait so it would update the cached value if we didn't unsubscribe + probe.expectNoMessage(Duration.ofMillis(500)); + client.tell(new Counter.GetCachedValue(probe.getRef())); + probe.expectMessage(3); // old value, not 4 } @Test diff --git a/akka-cluster-typed/src/test/scala/docs/akka/cluster/ddata/typed/scaladsl/ReplicatorDocSpec.scala b/akka-cluster-typed/src/test/scala/docs/akka/cluster/ddata/typed/scaladsl/ReplicatorDocSpec.scala index ef0bfb72d8..87d7d2ccb4 100644 --- a/akka-cluster-typed/src/test/scala/docs/akka/cluster/ddata/typed/scaladsl/ReplicatorDocSpec.scala +++ b/akka-cluster-typed/src/test/scala/docs/akka/cluster/ddata/typed/scaladsl/ReplicatorDocSpec.scala @@ -4,6 +4,7 @@ package docs.akka.cluster.ddata.typed.scaladsl +import scala.concurrent.duration._ import akka.cluster.ddata.SelfUniqueAddress import akka.cluster.ddata.typed.scaladsl.DistributedData import akka.cluster.ddata.typed.scaladsl.Replicator @@ -37,6 +38,7 @@ object ReplicatorDocSpec { final case object Increment extends Command final case class GetValue(replyTo: ActorRef[Int]) extends Command final case class GetCachedValue(replyTo: ActorRef[Int]) extends Command + case object Unsubscribe extends Command private sealed trait InternalCommand extends Command private case class InternalUpdateResponse(rsp: Replicator.UpdateResponse[GCounter]) extends InternalCommand private case class InternalGetResponse(rsp: Replicator.GetResponse[GCounter], replyTo: ActorRef[Int]) @@ -76,6 +78,10 @@ object ReplicatorDocSpec { replyTo ! cachedValue Behaviors.same + case Unsubscribe => + replicatorAdapter.unsubscribe(key) + Behaviors.same + case internal: InternalCommand => internal match { case InternalUpdateResponse(_) => Behaviors.same // ok @@ -125,7 +131,7 @@ class ReplicatorDocSpec probe.expectMessage(1) } - "have API for Subscribe" in { + "have API for Subscribe and Unsubscribe" in { val c = spawn(Counter(GCounterKey("counter2"))) val probe = createTestProbe[Int]() @@ -140,6 +146,13 @@ class ReplicatorDocSpec c ! Counter.GetCachedValue(probe.ref) probe.expectMessage(3) } + + c ! Counter.Unsubscribe + c ! Counter.Increment + // wait so it would update the cached value if we didn't unsubscribe + probe.expectNoMessage(500.millis) + c ! Counter.GetCachedValue(probe.ref) + probe.expectMessage(3) // old value, not 4 } "have an extension" in {