From 20fce3766530a58e4725e1f1fd5af2e87df224a1 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 19 Sep 2017 15:53:25 +0200 Subject: [PATCH] Subscribe --- .../cluster/ddata/javadsl/ReplicatorTest.java | 110 +++++++++--- .../ddata/scaladsl/ReplicatorSpec.scala | 77 ++++++--- .../ddata/internal/ReplicatorBehavior.scala | 159 ++++++++++++------ .../cluster/ddata/javadsl/Replicator.scala | 44 ++++- .../cluster/ddata/scaladsl/Replicator.scala | 34 +++- 5 files changed, 332 insertions(+), 92 deletions(-) diff --git a/akka-typed-tests/src/test/java/akka/typed/cluster/ddata/javadsl/ReplicatorTest.java b/akka-typed-tests/src/test/java/akka/typed/cluster/ddata/javadsl/ReplicatorTest.java index d994265d5a..893166bc97 100644 --- a/akka-typed-tests/src/test/java/akka/typed/cluster/ddata/javadsl/ReplicatorTest.java +++ b/akka-typed-tests/src/test/java/akka/typed/cluster/ddata/javadsl/ReplicatorTest.java @@ -21,8 +21,11 @@ import akka.testkit.AkkaJUnitActorSystemResource; import akka.testkit.javadsl.TestKit; import akka.typed.ActorRef; import akka.typed.Behavior; +import akka.typed.cluster.ddata.javadsl.Replicator.Command; import akka.typed.javadsl.Actor; import akka.typed.javadsl.Adapter; +import akka.typed.javadsl.Actor.MutableBehavior; +import akka.typed.javadsl.ActorContext; public class ReplicatorTest extends JUnitSuite { @@ -40,6 +43,14 @@ public class ReplicatorTest extends JUnitSuite { } } + static final class GetCachedValue implements ClientCommand { + final ActorRef replyTo; + + GetCachedValue(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + static interface InternalMsg extends ClientCommand { } @@ -59,33 +70,64 @@ public class ReplicatorTest extends JUnitSuite { } } + static final class InternalChanged implements InternalMsg { + final Replicator.Changed chg; + + public InternalChanged(Replicator.Changed chg) { + this.chg = chg; + } + } + static final Key Key = GCounterKey.create("counter"); - static Behavior client(ActorRef> replicator, Cluster node) { - return Actor.deferred(c -> { + static class Client extends MutableBehavior { + private final ActorRef> replicator; + private final Cluster node; + final ActorRef> updateResponseAdapter; + final ActorRef> getResponseAdapter; + final ActorRef> changedAdapter; - final ActorRef> updateResponseAdapter = - c.spawnAdapter(m -> new InternalUpdateResponse<>(m)); + private int cachedValue = 0; - final ActorRef> getResponseAdapter = - c.spawnAdapter(m -> new InternalGetResponse<>(m)); + public Client(ActorRef> replicator, Cluster node, ActorContext ctx) { + this.replicator = replicator; + this.node = node; - return Actor.immutable(ClientCommand.class) - .onMessage(Increment.class, (ctx, cmd) -> { + updateResponseAdapter = ctx.spawnAdapter(m -> new InternalUpdateResponse<>(m)); + + getResponseAdapter = ctx.spawnAdapter(m -> new InternalGetResponse<>(m)); + + changedAdapter = ctx.spawnAdapter(m -> new InternalChanged<>(m)); + + replicator.tell(new Replicator.Subscribe<>(Key, changedAdapter)); + } + + public static Behavior create(ActorRef> replicator, Cluster node) { + return Actor.mutable(ctx -> new Client(replicator, node, ctx)); + } + + @Override + public Actor.Receive createReceive() { + return receiveBuilder() + .onMessage(Increment.class, cmd -> { replicator.tell( new Replicator.Update(Key, GCounter.empty(), Replicator.writeLocal(), updateResponseAdapter, curr -> curr.increment(node, 1))); - return Actor.same(); + return this; }) - .onMessage(InternalUpdateResponse.class, (ctx, msg) -> { - return Actor.same(); + .onMessage(InternalUpdateResponse.class, msg -> { + return this; }) - .onMessage(GetValue.class, (ctx, cmd) -> { + .onMessage(GetValue.class, cmd -> { replicator.tell( new Replicator.Get(Key, Replicator.readLocal(), getResponseAdapter, Optional.of(cmd.replyTo))); - return Actor.same(); + return this; }) - .onMessage(InternalGetResponse.class, (ctx, msg) -> { + .onMessage(GetCachedValue.class, cmd -> { + cmd.replyTo.tell(cachedValue); + return this; + }) + .onMessage(InternalGetResponse.class, msg -> { if (msg.rsp instanceof Replicator.GetSuccess) { int value = ((Replicator.GetSuccess) msg.rsp).get(Key).getValue().intValue(); ActorRef replyTo = (ActorRef) msg.rsp.request().get(); @@ -93,10 +135,15 @@ public class ReplicatorTest extends JUnitSuite { } else { // not dealing with failures } - return Actor.same(); + return this; + }) + .onMessage(InternalChanged.class, msg -> { + GCounter counter = (GCounter) msg.chg.get(Key); + cachedValue = counter.getValue().intValue(); + return this; }) .build(); - }); + } } @@ -118,17 +165,42 @@ public class ReplicatorTest extends JUnitSuite { @Test - public void apiPrototype() { + public void shouldHaveApiForUpdateAndGet() { TestKit probe = new TestKit(system); akka.cluster.ddata.ReplicatorSettings settings = ReplicatorSettings.apply(typedSystem()); ActorRef> replicator = - Adapter.spawn(system, Replicator.behavior(settings), "replicator"); + Adapter.spawnAnonymous(system, Replicator.behavior(settings)); ActorRef client = - Adapter.spawnAnonymous(system, client(replicator, Cluster.get(system))); + Adapter.spawnAnonymous(system, Client.create(replicator, Cluster.get(system))); client.tell(new Increment()); client.tell(new GetValue(Adapter.toTyped(probe.getRef()))); probe.expectMsg(1); } + @Test + public void shouldHaveApiForSubscribe() { + TestKit probe = new TestKit(system); + akka.cluster.ddata.ReplicatorSettings settings = ReplicatorSettings.apply(typedSystem()); + ActorRef> replicator = + Adapter.spawnAnonymous(system, Replicator.behavior(settings)); + ActorRef client = + Adapter.spawnAnonymous(system, Client.create(replicator, Cluster.get(system))); + + client.tell(new Increment()); + client.tell(new Increment()); + probe.awaitAssert(() -> { + client.tell(new GetCachedValue(Adapter.toTyped(probe.getRef()))); + probe.expectMsg(2); + return null; + }); + + client.tell(new Increment()); + probe.awaitAssert(() -> { + client.tell(new GetCachedValue(Adapter.toTyped(probe.getRef()))); + probe.expectMsg(3); + return null; + }); + } + } diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/ddata/scaladsl/ReplicatorSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/ddata/scaladsl/ReplicatorSpec.scala index 67dff73e90..a357dec258 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/cluster/ddata/scaladsl/ReplicatorSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/cluster/ddata/scaladsl/ReplicatorSpec.scala @@ -20,6 +20,7 @@ import akka.typed.scaladsl.adapter._ import akka.typed.testkit.TestKitSettings import akka.typed.testkit.scaladsl._ import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.Eventually object ReplicatorSpec { @@ -32,9 +33,11 @@ object ReplicatorSpec { sealed trait ClientCommand final case object Increment extends ClientCommand final case class GetValue(replyTo: ActorRef[Int]) extends ClientCommand + final case class GetCachedValue(replyTo: ActorRef[Int]) extends ClientCommand private sealed trait InternalMsg extends ClientCommand private case class InternalUpdateResponse[A <: ReplicatedData](rsp: Replicator.UpdateResponse[A]) extends InternalMsg private case class InternalGetResponse[A <: ReplicatedData](rsp: Replicator.GetResponse[A]) extends InternalMsg + private case class InternalChanged[A <: ReplicatedData](chg: Replicator.Changed[A]) extends InternalMsg val Key = GCounterKey("counter") @@ -46,33 +49,51 @@ object ReplicatorSpec { val getResponseAdapter: ActorRef[Replicator.GetResponse[GCounter]] = ctx.spawnAdapter(InternalGetResponse.apply) - Actor.immutable[ClientCommand] { (ctx, msg) ⇒ - msg match { - case Increment ⇒ - replicator ! Replicator.Update(Key, GCounter.empty, Replicator.WriteLocal)(_ + 1)(updateResponseAdapter) - Actor.same + val changedAdapter: ActorRef[Replicator.Changed[GCounter]] = + ctx.spawnAdapter(InternalChanged.apply) - case GetValue(replyTo) ⇒ - replicator ! Replicator.Get(Key, Replicator.ReadLocal, Some(replyTo))(getResponseAdapter) - Actor.same + replicator ! Replicator.Subscribe(Key, changedAdapter) - case internal: InternalMsg ⇒ internal match { - case InternalUpdateResponse(_) ⇒ Actor.same // ok - - case InternalGetResponse(rsp @ Replicator.GetSuccess(Key, Some(replyTo: ActorRef[Int] @unchecked))) ⇒ - val value = rsp.get(Key).value.toInt - replyTo ! value + def behavior(cachedValue: Int): Behavior[ClientCommand] = { + Actor.immutable[ClientCommand] { (ctx, msg) ⇒ + msg match { + case Increment ⇒ + replicator ! Replicator.Update(Key, GCounter.empty, Replicator.WriteLocal)(_ + 1)(updateResponseAdapter) Actor.same - case InternalGetResponse(rsp) ⇒ - Actor.unhandled // not dealing with failures + case GetValue(replyTo) ⇒ + replicator ! Replicator.Get(Key, Replicator.ReadLocal, Some(replyTo))(getResponseAdapter) + Actor.same + + case GetCachedValue(replyTo) ⇒ + replicator ! Replicator.Get(Key, Replicator.ReadLocal, Some(replyTo))(getResponseAdapter) + Actor.same + + case internal: InternalMsg ⇒ internal match { + case InternalUpdateResponse(_) ⇒ Actor.same // ok + + case InternalGetResponse(rsp @ Replicator.GetSuccess(Key, Some(replyTo: ActorRef[Int] @unchecked))) ⇒ + val value = rsp.get(Key).value.toInt + replyTo ! value + Actor.same + + case InternalGetResponse(rsp) ⇒ + Actor.unhandled // not dealing with failures + + case InternalChanged(chg @ Replicator.Changed(Key)) ⇒ + val value = chg.get(Key).value.intValue + behavior(value) + } } } } + + behavior(cachedValue = 0) } + } -class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) { +class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) with Eventually { import ReplicatorSpec._ trait RealTests extends StartSupport { @@ -81,10 +102,8 @@ class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) { val settings = ReplicatorSettings(system) implicit val cluster = Cluster(system.toUntyped) - def `API prototype`(): Unit = { - + def `have API for Update and Get`(): Unit = { val replicator = start(Replicator.behavior(settings)) - val c = start(client(replicator)) val probe = TestProbe[Int] @@ -93,6 +112,24 @@ class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) { probe.expectMsg(1) } + def `have API for Subscribe`(): Unit = { + val replicator = start(Replicator.behavior(settings)) + val c = start(client(replicator)) + + val probe = TestProbe[Int] + c ! Increment + c ! Increment + eventually { + c ! GetCachedValue(probe.ref) + probe.expectMsg(2) + } + c ! Increment + eventually { + c ! GetCachedValue(probe.ref) + probe.expectMsg(3) + } + } + } object `A ReplicatorBehavior (real, adapted)` extends RealTests with AdaptedSystem diff --git a/akka-typed/src/main/scala/akka/typed/cluster/ddata/internal/ReplicatorBehavior.scala b/akka-typed/src/main/scala/akka/typed/cluster/ddata/internal/ReplicatorBehavior.scala index e8c1980b8d..6071184197 100644 --- a/akka-typed/src/main/scala/akka/typed/cluster/ddata/internal/ReplicatorBehavior.scala +++ b/akka-typed/src/main/scala/akka/typed/cluster/ddata/internal/ReplicatorBehavior.scala @@ -10,10 +10,14 @@ import scala.concurrent.duration.Duration import akka.annotation.InternalApi import akka.cluster.{ ddata ⇒ dd } import akka.pattern.ask +import akka.typed.ActorRef import akka.typed.Behavior import akka.typed.scaladsl.Actor import akka.typed.scaladsl.adapter._ import akka.util.Timeout +import akka.cluster.ddata.ReplicatedData +import akka.cluster.ddata.Key +import akka.typed.Terminated /** * INTERNAL API @@ -22,6 +26,11 @@ import akka.util.Timeout import akka.typed.cluster.ddata.javadsl.{ Replicator ⇒ JReplicator } import akka.typed.cluster.ddata.scaladsl.{ Replicator ⇒ SReplicator } + private case class InternalChanged[A <: ReplicatedData](chg: dd.Replicator.Changed[A], subscriber: ActorRef[JReplicator.Changed[A]]) + extends JReplicator.Command[A] { + override def key: Key[A] = chg.key + } + val localAskTimeout = 60.seconds // ReadLocal, WriteLocal shouldn't timeout val additionalAskTimeout = 1.second @@ -32,59 +41,111 @@ import akka.util.Timeout // FIXME perhaps add supervisor for restarting val untypedReplicator = ctx.actorOf(untypedReplicatorProps, name = "underlying") - Actor.immutable[SReplicator.Command[_]] { (ctx, msg) ⇒ - msg match { - case cmd: SReplicator.Get[_] ⇒ - untypedReplicator.tell( - dd.Replicator.Get(cmd.key, cmd.consistency, cmd.request), - sender = cmd.replyTo.toUntyped) - Actor.same + def withState( + subscribeAdapters: Map[ActorRef[JReplicator.Changed[ReplicatedData]], ActorRef[dd.Replicator.Changed[ReplicatedData]]]): Behavior[SReplicator.Command[_]] = { - case cmd: SReplicator.Update[_] ⇒ - untypedReplicator.tell( - dd.Replicator.Update(cmd.key, cmd.writeConsistency, cmd.request)(cmd.modify), - sender = cmd.replyTo.toUntyped) - Actor.same - - case cmd: JReplicator.Get[d] ⇒ - implicit val timeout = Timeout(cmd.consistency.timeout match { - case Duration.Zero ⇒ localAskTimeout - case t ⇒ t + additionalAskTimeout - }) - import ctx.executionContext - val reply = - (untypedReplicator ? dd.Replicator.Get(cmd.key, cmd.consistency.toUntyped, cmd.request.asScala)) - .mapTo[dd.Replicator.GetResponse[d]].map { - case rsp: dd.Replicator.GetSuccess[d] ⇒ JReplicator.GetSuccess(rsp.key, rsp.request.asJava)(rsp.dataValue) - case rsp: dd.Replicator.NotFound[d] ⇒ JReplicator.NotFound(rsp.key, rsp.request.asJava) - case rsp: dd.Replicator.GetFailure[d] ⇒ JReplicator.GetFailure(rsp.key, rsp.request.asJava) - }.recover { - case _ ⇒ JReplicator.GetFailure(cmd.key, cmd.request) - } - reply.foreach { cmd.replyTo ! _ } - Actor.same - - case cmd: JReplicator.Update[d] ⇒ - implicit val timeout = Timeout(cmd.writeConsistency.timeout match { - case Duration.Zero ⇒ localAskTimeout - case t ⇒ t + additionalAskTimeout - }) - import ctx.executionContext - val reply = - (untypedReplicator ? dd.Replicator.Update(cmd.key, cmd.writeConsistency.toUntyped, cmd.request.asScala)(cmd.modify)) - .mapTo[dd.Replicator.UpdateResponse[d]].map { - case rsp: dd.Replicator.UpdateSuccess[d] ⇒ JReplicator.UpdateSuccess(rsp.key, rsp.request.asJava) - case rsp: dd.Replicator.UpdateTimeout[d] ⇒ JReplicator.UpdateTimeout(rsp.key, rsp.request.asJava) - case rsp: dd.Replicator.ModifyFailure[d] ⇒ JReplicator.ModifyFailure(rsp.key, rsp.errorMessage, rsp.cause, rsp.request.asJava) - case rsp: dd.Replicator.StoreFailure[d] ⇒ JReplicator.StoreFailure(rsp.key, rsp.request.asJava) - }.recover { - case _ ⇒ JReplicator.UpdateTimeout(cmd.key, cmd.request) - } - reply.foreach { cmd.replyTo ! _ } - Actor.same + def stopSubscribeAdapter(subscriber: ActorRef[JReplicator.Changed[ReplicatedData]]): Behavior[SReplicator.Command[_]] = { + subscribeAdapters.get(subscriber) match { + case Some(adapter) ⇒ + // will be unsubscribed from untypedReplicator via Terminated + ctx.stop(adapter) + withState(subscribeAdapters - subscriber) + case None ⇒ // already unsubscribed or terminated + Actor.same + } } + + Actor.immutable[SReplicator.Command[_]] { (ctx, msg) ⇒ + msg match { + case cmd: SReplicator.Get[_] ⇒ + untypedReplicator.tell( + dd.Replicator.Get(cmd.key, cmd.consistency, cmd.request), + sender = cmd.replyTo.toUntyped) + Actor.same + + case cmd: SReplicator.Update[_] ⇒ + untypedReplicator.tell( + dd.Replicator.Update(cmd.key, cmd.writeConsistency, cmd.request)(cmd.modify), + sender = cmd.replyTo.toUntyped) + Actor.same + + case cmd: JReplicator.Get[d] ⇒ + implicit val timeout = Timeout(cmd.consistency.timeout match { + case Duration.Zero ⇒ localAskTimeout + case t ⇒ t + additionalAskTimeout + }) + import ctx.executionContext + val reply = + (untypedReplicator ? dd.Replicator.Get(cmd.key, cmd.consistency.toUntyped, cmd.request.asScala)) + .mapTo[dd.Replicator.GetResponse[d]].map { + case rsp: dd.Replicator.GetSuccess[d] ⇒ JReplicator.GetSuccess(rsp.key, rsp.request.asJava)(rsp.dataValue) + case rsp: dd.Replicator.NotFound[d] ⇒ JReplicator.NotFound(rsp.key, rsp.request.asJava) + case rsp: dd.Replicator.GetFailure[d] ⇒ JReplicator.GetFailure(rsp.key, rsp.request.asJava) + }.recover { + case _ ⇒ JReplicator.GetFailure(cmd.key, cmd.request) + } + reply.foreach { cmd.replyTo ! _ } + Actor.same + + case cmd: JReplicator.Update[d] ⇒ + implicit val timeout = Timeout(cmd.writeConsistency.timeout match { + case Duration.Zero ⇒ localAskTimeout + case t ⇒ t + additionalAskTimeout + }) + import ctx.executionContext + val reply = + (untypedReplicator ? dd.Replicator.Update(cmd.key, cmd.writeConsistency.toUntyped, cmd.request.asScala)(cmd.modify)) + .mapTo[dd.Replicator.UpdateResponse[d]].map { + case rsp: dd.Replicator.UpdateSuccess[d] ⇒ JReplicator.UpdateSuccess(rsp.key, rsp.request.asJava) + case rsp: dd.Replicator.UpdateTimeout[d] ⇒ JReplicator.UpdateTimeout(rsp.key, rsp.request.asJava) + case rsp: dd.Replicator.ModifyFailure[d] ⇒ JReplicator.ModifyFailure(rsp.key, rsp.errorMessage, rsp.cause, rsp.request.asJava) + case rsp: dd.Replicator.StoreFailure[d] ⇒ JReplicator.StoreFailure(rsp.key, rsp.request.asJava) + }.recover { + case _ ⇒ JReplicator.UpdateTimeout(cmd.key, cmd.request) + } + reply.foreach { cmd.replyTo ! _ } + Actor.same + + case cmd: SReplicator.Subscribe[_] ⇒ + // For the Scala API the Changed messages can be sent directly to the subscriber + untypedReplicator.tell( + dd.Replicator.Subscribe(cmd.key, cmd.subscriber.toUntyped), + sender = cmd.subscriber.toUntyped) + Actor.same + + case cmd: JReplicator.Subscribe[ReplicatedData] @unchecked ⇒ + // For the Java API the Changed messages must be mapped to the JReplicator.Changed class. + // That is done with an adapter, and we have to keep track of the lifecycle of the original + // subscriber and stop the adapter when the original subscriber is stopped. + val adapter: ActorRef[dd.Replicator.Changed[ReplicatedData]] = ctx.spawnAdapter { + chg ⇒ InternalChanged(chg, cmd.subscriber) + } + + untypedReplicator.tell( + dd.Replicator.Subscribe(cmd.key, adapter.toUntyped), + sender = akka.actor.ActorRef.noSender) + + ctx.watch(cmd.subscriber) + + withState(subscribeAdapters.updated(cmd.subscriber, adapter)) + + case InternalChanged(chg, subscriber) ⇒ + subscriber ! JReplicator.Changed(chg.key)(chg.dataValue) + Actor.same + + case cmd: JReplicator.Unsubscribe[ReplicatedData] @unchecked ⇒ + stopSubscribeAdapter(cmd.subscriber) + + } + } + .onSignal { + case (ctx, Terminated(ref: ActorRef[JReplicator.Changed[ReplicatedData]] @unchecked)) ⇒ + stopSubscribeAdapter(ref) + } } + withState(Map.empty) + } } } diff --git a/akka-typed/src/main/scala/akka/typed/cluster/ddata/javadsl/Replicator.scala b/akka-typed/src/main/scala/akka/typed/cluster/ddata/javadsl/Replicator.scala index 1f3e836045..6d828c1a25 100644 --- a/akka-typed/src/main/scala/akka/typed/cluster/ddata/javadsl/Replicator.scala +++ b/akka-typed/src/main/scala/akka/typed/cluster/ddata/javadsl/Replicator.scala @@ -26,7 +26,7 @@ object Replicator { def behavior(settings: dd.ReplicatorSettings): Behavior[Command[_]] = ReplicatorBehavior.behavior(settings).narrow[Command[_]] - sealed trait Command[A <: ReplicatedData] extends scaladsl.Replicator.Command[A] { + trait Command[A <: ReplicatedData] extends scaladsl.Replicator.Command[A] { def key: Key[A] } @@ -254,4 +254,46 @@ object Replicator { def getRequest: Optional[Any] = request } + /** + * Register a subscriber that will be notified with a [[Changed]] message + * when the value of the given `key` is changed. Current value is also + * sent as a [[Changed]] message to a new subscriber. + * + * Subscribers will be notified periodically with the configured `notify-subscribers-interval`, + * and it is also possible to send an explicit `FlushChanges` message to + * the `Replicator` to notify the subscribers immediately. + * + * The subscriber will automatically be unregistered if it is terminated. + * + * If the key is deleted the subscriber is notified with a [[Deleted]] + * message. + */ + final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command[A] + /** + * Unregister a subscriber. + * + * @see [[Replicator.Subscribe]] + */ + final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command[A] + /** + * The data value is retrieved with [[#get]] using the typed key. + * + * @see [[Replicator.Subscribe]] + */ + final case class Changed[A <: ReplicatedData](key: Key[A])(data: A) { + /** + * The data value, with correct type. + * Scala pattern matching cannot infer the type from the `key` parameter. + */ + def get[T <: ReplicatedData](key: Key[T]): T = { + require(key == this.key, "wrong key used, must use contained key") + data.asInstanceOf[T] + } + + /** + * The data value. Use [[#get]] to get the fully typed value. + */ + def dataValue: A = data + } + } diff --git a/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/Replicator.scala b/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/Replicator.scala index deeae84f24..c33b317439 100644 --- a/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/Replicator.scala +++ b/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/Replicator.scala @@ -34,9 +34,7 @@ object Replicator { } final case class Get[A <: ReplicatedData](key: Key[A], consistency: ReadConsistency, request: Option[Any] = None)(val replyTo: ActorRef[GetResponse[A]]) - extends Command[A] { - - } + extends Command[A] type GetResponse[A <: ReplicatedData] = dd.Replicator.GetResponse[A] object GetSuccess { @@ -97,4 +95,34 @@ object Replicator { type ModifyFailure[A <: ReplicatedData] = dd.Replicator.ModifyFailure[A] type StoreFailure[A <: ReplicatedData] = dd.Replicator.StoreFailure[A] + /** + * Register a subscriber that will be notified with a [[Changed]] message + * when the value of the given `key` is changed. Current value is also + * sent as a [[Changed]] message to a new subscriber. + * + * Subscribers will be notified periodically with the configured `notify-subscribers-interval`, + * and it is also possible to send an explicit `FlushChanges` message to + * the `Replicator` to notify the subscribers immediately. + * + * The subscriber will automatically be unregistered if it is terminated. + * + * If the key is deleted the subscriber is notified with a [[Deleted]] + * message. + */ + final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) + extends Command[A] + + /** + * Unregister a subscriber. + * + * @see [[Replicator.Subscribe]] + */ + final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) + extends Command[A] + + object Changed { + def unapply[A <: ReplicatedData](chg: Changed[A]): Option[Key[A]] = Some(chg.key) + } + type Changed[A <: ReplicatedData] = dd.Replicator.Changed[A] + }