Handle Replicator.Unsubscribe in Typed scaladsl, #28061

* scaladsl.Unsubscribe was missing in ReplicatorBehavior
* also noticed a Changed vs SubscribeResponse leftover from
  previous api change
This commit is contained in:
Patrik Nordwall 2019-10-26 10:49:50 +02:00
parent 1b44348425
commit 36aa8fd48a
5 changed files with 50 additions and 10 deletions

View file

@ -48,11 +48,11 @@ import akka.actor.typed.Terminated
def withState( def withState(
subscribeAdapters: Map[ subscribeAdapters: Map[
ActorRef[JReplicator.Changed[ReplicatedData]], ActorRef[JReplicator.SubscribeResponse[ReplicatedData]],
ActorRef[dd.Replicator.Changed[ReplicatedData]]]): Behavior[SReplicator.Command] = { ActorRef[dd.Replicator.SubscribeResponse[ReplicatedData]]]): Behavior[SReplicator.Command] = {
def stopSubscribeAdapter( def stopSubscribeAdapter(
subscriber: ActorRef[JReplicator.Changed[ReplicatedData]]): Behavior[SReplicator.Command] = { subscriber: ActorRef[JReplicator.SubscribeResponse[ReplicatedData]]): Behavior[SReplicator.Command] = {
subscribeAdapters.get(subscriber) match { subscribeAdapters.get(subscriber) match {
case Some(adapter) => case Some(adapter) =>
// will be unsubscribed from classicReplicator via Terminated // will be unsubscribed from classicReplicator via Terminated
@ -122,14 +122,14 @@ import akka.actor.typed.Terminated
Behaviors.same Behaviors.same
case cmd: SReplicator.Subscribe[_] => case cmd: SReplicator.Subscribe[_] =>
// For the Scala API the Changed messages can be sent directly to the subscriber // For the Scala API the SubscribeResponse messages can be sent directly to the subscriber
classicReplicator.tell( classicReplicator.tell(
dd.Replicator.Subscribe(cmd.key, cmd.subscriber.toClassic), dd.Replicator.Subscribe(cmd.key, cmd.subscriber.toClassic),
sender = cmd.subscriber.toClassic) sender = cmd.subscriber.toClassic)
Behaviors.same Behaviors.same
case cmd: JReplicator.Subscribe[ReplicatedData] @unchecked => case cmd: JReplicator.Subscribe[ReplicatedData] @unchecked =>
// For the Java API the Changed messages must be mapped to the JReplicator.Changed class. // For the Java API the Changed/Deleted messages must be mapped to the JReplicator.Changed/Deleted class.
// That is done with an adapter, and we have to keep track of the lifecycle of the original // That is done with an adapter, and we have to keep track of the lifecycle of the original
// subscriber and stop the adapter when the original subscriber is stopped. // subscriber and stop the adapter when the original subscriber is stopped.
val adapter: ActorRef[dd.Replicator.SubscribeResponse[ReplicatedData]] = ctx.spawnMessageAdapter { val adapter: ActorRef[dd.Replicator.SubscribeResponse[ReplicatedData]] = ctx.spawnMessageAdapter {
@ -152,6 +152,12 @@ import akka.actor.typed.Terminated
} }
Behaviors.same Behaviors.same
case cmd: SReplicator.Unsubscribe[_] =>
classicReplicator.tell(
dd.Replicator.Unsubscribe(cmd.key, cmd.subscriber.toClassic),
sender = cmd.subscriber.toClassic)
Behaviors.same
case cmd: JReplicator.Unsubscribe[ReplicatedData] @unchecked => case cmd: JReplicator.Unsubscribe[ReplicatedData] @unchecked =>
stopSubscribeAdapter(cmd.subscriber) stopSubscribeAdapter(cmd.subscriber)
@ -202,7 +208,7 @@ import akka.actor.typed.Terminated
} }
} }
.receiveSignal { .receiveSignal {
case (_, Terminated(ref: ActorRef[JReplicator.Changed[ReplicatedData]] @unchecked)) => case (_, Terminated(ref: ActorRef[JReplicator.SubscribeResponse[ReplicatedData]] @unchecked)) =>
stopSubscribeAdapter(ref) stopSubscribeAdapter(ref)
} }
} }

View file

@ -250,14 +250,16 @@ object Replicator {
* If the key is deleted the subscriber is notified with a [[Deleted]] * If the key is deleted the subscriber is notified with a [[Deleted]]
* message. * message.
*/ */
final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[SubscribeResponse[A]])
extends Command
/** /**
* Unregister a subscriber. * Unregister a subscriber.
* *
* @see [[Subscribe]] * @see [[Subscribe]]
*/ */
final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[SubscribeResponse[A]])
extends Command
/** /**
* @see [[Subscribe]] * @see [[Subscribe]]

View file

@ -45,6 +45,10 @@ interface ReplicatorDocSample {
} }
} }
enum Unsubscribe implements Command {
INSTANCE
}
private interface InternalCommand extends Command {} private interface InternalCommand extends Command {}
private static class InternalUpdateResponse implements InternalCommand { private static class InternalUpdateResponse implements InternalCommand {
@ -115,6 +119,7 @@ interface ReplicatorDocSample {
.onMessage(InternalUpdateResponse.class, msg -> Behaviors.same()) .onMessage(InternalUpdateResponse.class, msg -> Behaviors.same())
.onMessage(GetValue.class, this::onGetValue) .onMessage(GetValue.class, this::onGetValue)
.onMessage(GetCachedValue.class, this::onGetCachedValue) .onMessage(GetCachedValue.class, this::onGetCachedValue)
.onMessage(Unsubscribe.class, this::onUnsubscribe)
.onMessage(InternalGetResponse.class, this::onInternalGetResponse) .onMessage(InternalGetResponse.class, this::onInternalGetResponse)
.onMessage(InternalSubscribeResponse.class, this::onInternalSubscribeResponse) .onMessage(InternalSubscribeResponse.class, this::onInternalSubscribeResponse)
.build(); .build();
@ -147,6 +152,11 @@ interface ReplicatorDocSample {
return this; return this;
} }
private Behavior<Command> onUnsubscribe(Unsubscribe cmd) {
replicatorAdapter.unsubscribe(key);
return this;
}
private Behavior<Command> onInternalGetResponse(InternalGetResponse msg) { private Behavior<Command> onInternalGetResponse(InternalGetResponse msg) {
if (msg.rsp instanceof Replicator.GetSuccess) { if (msg.rsp instanceof Replicator.GetSuccess) {
int value = ((Replicator.GetSuccess<?>) msg.rsp).get(key).getValue().intValue(); int value = ((Replicator.GetSuccess<?>) msg.rsp).get(key).getValue().intValue();

View file

@ -20,6 +20,8 @@ import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.scalatest.junit.JUnitSuite; import org.scalatest.junit.JUnitSuite;
import java.time.Duration;
import static jdocs.akka.cluster.ddata.typed.javadsl.ReplicatorDocSample.Counter; import static jdocs.akka.cluster.ddata.typed.javadsl.ReplicatorDocSample.Counter;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -48,7 +50,7 @@ public class ReplicatorDocTest extends JUnitSuite {
} }
@Test @Test
public void shouldHaveApiForSubscribe() { public void shouldHaveApiForSubscribeAndUnsubscribe() {
TestProbe<Integer> probe = testKit.createTestProbe(Integer.class); TestProbe<Integer> probe = testKit.createTestProbe(Integer.class);
ActorRef<Counter.Command> client = ActorRef<Counter.Command> client =
testKit.spawn(Counter.create(GCounterKey.create("counter2"))); testKit.spawn(Counter.create(GCounterKey.create("counter2")));
@ -69,6 +71,13 @@ public class ReplicatorDocTest extends JUnitSuite {
probe.expectMessage(3); probe.expectMessage(3);
return null; return null;
}); });
client.tell(Counter.Unsubscribe.INSTANCE);
client.tell(Counter.Increment.INSTANCE);
// wait so it would update the cached value if we didn't unsubscribe
probe.expectNoMessage(Duration.ofMillis(500));
client.tell(new Counter.GetCachedValue(probe.getRef()));
probe.expectMessage(3); // old value, not 4
} }
@Test @Test

View file

@ -4,6 +4,7 @@
package docs.akka.cluster.ddata.typed.scaladsl package docs.akka.cluster.ddata.typed.scaladsl
import scala.concurrent.duration._
import akka.cluster.ddata.SelfUniqueAddress import akka.cluster.ddata.SelfUniqueAddress
import akka.cluster.ddata.typed.scaladsl.DistributedData import akka.cluster.ddata.typed.scaladsl.DistributedData
import akka.cluster.ddata.typed.scaladsl.Replicator import akka.cluster.ddata.typed.scaladsl.Replicator
@ -37,6 +38,7 @@ object ReplicatorDocSpec {
final case object Increment extends Command final case object Increment extends Command
final case class GetValue(replyTo: ActorRef[Int]) extends Command final case class GetValue(replyTo: ActorRef[Int]) extends Command
final case class GetCachedValue(replyTo: ActorRef[Int]) extends Command final case class GetCachedValue(replyTo: ActorRef[Int]) extends Command
case object Unsubscribe extends Command
private sealed trait InternalCommand extends Command private sealed trait InternalCommand extends Command
private case class InternalUpdateResponse(rsp: Replicator.UpdateResponse[GCounter]) extends InternalCommand private case class InternalUpdateResponse(rsp: Replicator.UpdateResponse[GCounter]) extends InternalCommand
private case class InternalGetResponse(rsp: Replicator.GetResponse[GCounter], replyTo: ActorRef[Int]) private case class InternalGetResponse(rsp: Replicator.GetResponse[GCounter], replyTo: ActorRef[Int])
@ -76,6 +78,10 @@ object ReplicatorDocSpec {
replyTo ! cachedValue replyTo ! cachedValue
Behaviors.same Behaviors.same
case Unsubscribe =>
replicatorAdapter.unsubscribe(key)
Behaviors.same
case internal: InternalCommand => case internal: InternalCommand =>
internal match { internal match {
case InternalUpdateResponse(_) => Behaviors.same // ok case InternalUpdateResponse(_) => Behaviors.same // ok
@ -125,7 +131,7 @@ class ReplicatorDocSpec
probe.expectMessage(1) probe.expectMessage(1)
} }
"have API for Subscribe" in { "have API for Subscribe and Unsubscribe" in {
val c = spawn(Counter(GCounterKey("counter2"))) val c = spawn(Counter(GCounterKey("counter2")))
val probe = createTestProbe[Int]() val probe = createTestProbe[Int]()
@ -140,6 +146,13 @@ class ReplicatorDocSpec
c ! Counter.GetCachedValue(probe.ref) c ! Counter.GetCachedValue(probe.ref)
probe.expectMessage(3) probe.expectMessage(3)
} }
c ! Counter.Unsubscribe
c ! Counter.Increment
// wait so it would update the cached value if we didn't unsubscribe
probe.expectNoMessage(500.millis)
c ! Counter.GetCachedValue(probe.ref)
probe.expectMessage(3) // old value, not 4
} }
"have an extension" in { "have an extension" in {