Subscribe

This commit is contained in:
Patrik Nordwall 2017-09-19 15:53:25 +02:00
parent 9496b59289
commit 20fce37665
5 changed files with 332 additions and 92 deletions

View file

@ -21,8 +21,11 @@ import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.javadsl.TestKit;
import akka.typed.ActorRef;
import akka.typed.Behavior;
import akka.typed.cluster.ddata.javadsl.Replicator.Command;
import akka.typed.javadsl.Actor;
import akka.typed.javadsl.Adapter;
import akka.typed.javadsl.Actor.MutableBehavior;
import akka.typed.javadsl.ActorContext;
public class ReplicatorTest extends JUnitSuite {
@ -40,6 +43,14 @@ public class ReplicatorTest extends JUnitSuite {
}
}
static final class GetCachedValue implements ClientCommand {
final ActorRef<Integer> replyTo;
GetCachedValue(ActorRef<Integer> replyTo) {
this.replyTo = replyTo;
}
}
static interface InternalMsg extends ClientCommand {
}
@ -59,33 +70,64 @@ public class ReplicatorTest extends JUnitSuite {
}
}
static final class InternalChanged<A extends ReplicatedData> implements InternalMsg {
final Replicator.Changed<A> chg;
public InternalChanged(Replicator.Changed<A> chg) {
this.chg = chg;
}
}
static final Key<GCounter> Key = GCounterKey.create("counter");
static Behavior<ClientCommand> client(ActorRef<Replicator.Command<?>> replicator, Cluster node) {
return Actor.deferred(c -> {
static class Client extends MutableBehavior<ClientCommand> {
private final ActorRef<Replicator.Command<?>> replicator;
private final Cluster node;
final ActorRef<Replicator.UpdateResponse<GCounter>> updateResponseAdapter;
final ActorRef<Replicator.GetResponse<GCounter>> getResponseAdapter;
final ActorRef<Replicator.Changed<GCounter>> changedAdapter;
final ActorRef<Replicator.UpdateResponse<GCounter>> updateResponseAdapter =
c.spawnAdapter(m -> new InternalUpdateResponse<>(m));
private int cachedValue = 0;
final ActorRef<Replicator.GetResponse<GCounter>> getResponseAdapter =
c.spawnAdapter(m -> new InternalGetResponse<>(m));
public Client(ActorRef<Command<?>> replicator, Cluster node, ActorContext<ClientCommand> ctx) {
this.replicator = replicator;
this.node = node;
return Actor.immutable(ClientCommand.class)
.onMessage(Increment.class, (ctx, cmd) -> {
updateResponseAdapter = ctx.spawnAdapter(m -> new InternalUpdateResponse<>(m));
getResponseAdapter = ctx.spawnAdapter(m -> new InternalGetResponse<>(m));
changedAdapter = ctx.spawnAdapter(m -> new InternalChanged<>(m));
replicator.tell(new Replicator.Subscribe<>(Key, changedAdapter));
}
public static Behavior<ClientCommand> create(ActorRef<Command<?>> replicator, Cluster node) {
return Actor.mutable(ctx -> new Client(replicator, node, ctx));
}
@Override
public Actor.Receive<ClientCommand> createReceive() {
return receiveBuilder()
.onMessage(Increment.class, cmd -> {
replicator.tell(
new Replicator.Update<GCounter>(Key, GCounter.empty(), Replicator.writeLocal(), updateResponseAdapter,
curr -> curr.increment(node, 1)));
return Actor.same();
return this;
})
.onMessage(InternalUpdateResponse.class, (ctx, msg) -> {
return Actor.same();
.onMessage(InternalUpdateResponse.class, msg -> {
return this;
})
.onMessage(GetValue.class, (ctx, cmd) -> {
.onMessage(GetValue.class, cmd -> {
replicator.tell(
new Replicator.Get<GCounter>(Key, Replicator.readLocal(), getResponseAdapter, Optional.of(cmd.replyTo)));
return Actor.same();
return this;
})
.onMessage(InternalGetResponse.class, (ctx, msg) -> {
.onMessage(GetCachedValue.class, cmd -> {
cmd.replyTo.tell(cachedValue);
return this;
})
.onMessage(InternalGetResponse.class, msg -> {
if (msg.rsp instanceof Replicator.GetSuccess) {
int value = ((Replicator.GetSuccess<?>) msg.rsp).get(Key).getValue().intValue();
ActorRef<Integer> replyTo = (ActorRef<Integer>) msg.rsp.request().get();
@ -93,10 +135,15 @@ public class ReplicatorTest extends JUnitSuite {
} else {
// not dealing with failures
}
return Actor.same();
return this;
})
.onMessage(InternalChanged.class, msg -> {
GCounter counter = (GCounter) msg.chg.get(Key);
cachedValue = counter.getValue().intValue();
return this;
})
.build();
});
}
}
@ -118,17 +165,42 @@ public class ReplicatorTest extends JUnitSuite {
@Test
public void apiPrototype() {
public void shouldHaveApiForUpdateAndGet() {
TestKit probe = new TestKit(system);
akka.cluster.ddata.ReplicatorSettings settings = ReplicatorSettings.apply(typedSystem());
ActorRef<Replicator.Command<?>> replicator =
Adapter.spawn(system, Replicator.behavior(settings), "replicator");
Adapter.spawnAnonymous(system, Replicator.behavior(settings));
ActorRef<ClientCommand> client =
Adapter.spawnAnonymous(system, client(replicator, Cluster.get(system)));
Adapter.spawnAnonymous(system, Client.create(replicator, Cluster.get(system)));
client.tell(new Increment());
client.tell(new GetValue(Adapter.toTyped(probe.getRef())));
probe.expectMsg(1);
}
@Test
public void shouldHaveApiForSubscribe() {
TestKit probe = new TestKit(system);
akka.cluster.ddata.ReplicatorSettings settings = ReplicatorSettings.apply(typedSystem());
ActorRef<Replicator.Command<?>> replicator =
Adapter.spawnAnonymous(system, Replicator.behavior(settings));
ActorRef<ClientCommand> client =
Adapter.spawnAnonymous(system, Client.create(replicator, Cluster.get(system)));
client.tell(new Increment());
client.tell(new Increment());
probe.awaitAssert(() -> {
client.tell(new GetCachedValue(Adapter.toTyped(probe.getRef())));
probe.expectMsg(2);
return null;
});
client.tell(new Increment());
probe.awaitAssert(() -> {
client.tell(new GetCachedValue(Adapter.toTyped(probe.getRef())));
probe.expectMsg(3);
return null;
});
}
}

View file

@ -20,6 +20,7 @@ import akka.typed.scaladsl.adapter._
import akka.typed.testkit.TestKitSettings
import akka.typed.testkit.scaladsl._
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.Eventually
object ReplicatorSpec {
@ -32,9 +33,11 @@ object ReplicatorSpec {
sealed trait ClientCommand
final case object Increment extends ClientCommand
final case class GetValue(replyTo: ActorRef[Int]) extends ClientCommand
final case class GetCachedValue(replyTo: ActorRef[Int]) extends ClientCommand
private sealed trait InternalMsg extends ClientCommand
private case class InternalUpdateResponse[A <: ReplicatedData](rsp: Replicator.UpdateResponse[A]) extends InternalMsg
private case class InternalGetResponse[A <: ReplicatedData](rsp: Replicator.GetResponse[A]) extends InternalMsg
private case class InternalChanged[A <: ReplicatedData](chg: Replicator.Changed[A]) extends InternalMsg
val Key = GCounterKey("counter")
@ -46,6 +49,12 @@ object ReplicatorSpec {
val getResponseAdapter: ActorRef[Replicator.GetResponse[GCounter]] =
ctx.spawnAdapter(InternalGetResponse.apply)
val changedAdapter: ActorRef[Replicator.Changed[GCounter]] =
ctx.spawnAdapter(InternalChanged.apply)
replicator ! Replicator.Subscribe(Key, changedAdapter)
def behavior(cachedValue: Int): Behavior[ClientCommand] = {
Actor.immutable[ClientCommand] { (ctx, msg)
msg match {
case Increment
@ -56,6 +65,10 @@ object ReplicatorSpec {
replicator ! Replicator.Get(Key, Replicator.ReadLocal, Some(replyTo))(getResponseAdapter)
Actor.same
case GetCachedValue(replyTo)
replicator ! Replicator.Get(Key, Replicator.ReadLocal, Some(replyTo))(getResponseAdapter)
Actor.same
case internal: InternalMsg internal match {
case InternalUpdateResponse(_) Actor.same // ok
@ -66,13 +79,21 @@ object ReplicatorSpec {
case InternalGetResponse(rsp)
Actor.unhandled // not dealing with failures
case InternalChanged(chg @ Replicator.Changed(Key))
val value = chg.get(Key).value.intValue
behavior(value)
}
}
}
}
behavior(cachedValue = 0)
}
}
class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) {
class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) with Eventually {
import ReplicatorSpec._
trait RealTests extends StartSupport {
@ -81,10 +102,8 @@ class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) {
val settings = ReplicatorSettings(system)
implicit val cluster = Cluster(system.toUntyped)
def `API prototype`(): Unit = {
def `have API for Update and Get`(): Unit = {
val replicator = start(Replicator.behavior(settings))
val c = start(client(replicator))
val probe = TestProbe[Int]
@ -93,6 +112,24 @@ class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) {
probe.expectMsg(1)
}
def `have API for Subscribe`(): Unit = {
val replicator = start(Replicator.behavior(settings))
val c = start(client(replicator))
val probe = TestProbe[Int]
c ! Increment
c ! Increment
eventually {
c ! GetCachedValue(probe.ref)
probe.expectMsg(2)
}
c ! Increment
eventually {
c ! GetCachedValue(probe.ref)
probe.expectMsg(3)
}
}
}
object `A ReplicatorBehavior (real, adapted)` extends RealTests with AdaptedSystem

View file

@ -10,10 +10,14 @@ import scala.concurrent.duration.Duration
import akka.annotation.InternalApi
import akka.cluster.{ ddata dd }
import akka.pattern.ask
import akka.typed.ActorRef
import akka.typed.Behavior
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.adapter._
import akka.util.Timeout
import akka.cluster.ddata.ReplicatedData
import akka.cluster.ddata.Key
import akka.typed.Terminated
/**
* INTERNAL API
@ -22,6 +26,11 @@ import akka.util.Timeout
import akka.typed.cluster.ddata.javadsl.{ Replicator JReplicator }
import akka.typed.cluster.ddata.scaladsl.{ Replicator SReplicator }
private case class InternalChanged[A <: ReplicatedData](chg: dd.Replicator.Changed[A], subscriber: ActorRef[JReplicator.Changed[A]])
extends JReplicator.Command[A] {
override def key: Key[A] = chg.key
}
val localAskTimeout = 60.seconds // ReadLocal, WriteLocal shouldn't timeout
val additionalAskTimeout = 1.second
@ -32,6 +41,20 @@ import akka.util.Timeout
// FIXME perhaps add supervisor for restarting
val untypedReplicator = ctx.actorOf(untypedReplicatorProps, name = "underlying")
def withState(
subscribeAdapters: Map[ActorRef[JReplicator.Changed[ReplicatedData]], ActorRef[dd.Replicator.Changed[ReplicatedData]]]): Behavior[SReplicator.Command[_]] = {
def stopSubscribeAdapter(subscriber: ActorRef[JReplicator.Changed[ReplicatedData]]): Behavior[SReplicator.Command[_]] = {
subscribeAdapters.get(subscriber) match {
case Some(adapter)
// will be unsubscribed from untypedReplicator via Terminated
ctx.stop(adapter)
withState(subscribeAdapters - subscriber)
case None // already unsubscribed or terminated
Actor.same
}
}
Actor.immutable[SReplicator.Command[_]] { (ctx, msg)
msg match {
case cmd: SReplicator.Get[_]
@ -82,8 +105,46 @@ import akka.util.Timeout
}
reply.foreach { cmd.replyTo ! _ }
Actor.same
case cmd: SReplicator.Subscribe[_]
// For the Scala API the Changed messages can be sent directly to the subscriber
untypedReplicator.tell(
dd.Replicator.Subscribe(cmd.key, cmd.subscriber.toUntyped),
sender = cmd.subscriber.toUntyped)
Actor.same
case cmd: JReplicator.Subscribe[ReplicatedData] @unchecked
// For the Java API the Changed messages must be mapped to the JReplicator.Changed class.
// That is done with an adapter, and we have to keep track of the lifecycle of the original
// subscriber and stop the adapter when the original subscriber is stopped.
val adapter: ActorRef[dd.Replicator.Changed[ReplicatedData]] = ctx.spawnAdapter {
chg InternalChanged(chg, cmd.subscriber)
}
untypedReplicator.tell(
dd.Replicator.Subscribe(cmd.key, adapter.toUntyped),
sender = akka.actor.ActorRef.noSender)
ctx.watch(cmd.subscriber)
withState(subscribeAdapters.updated(cmd.subscriber, adapter))
case InternalChanged(chg, subscriber)
subscriber ! JReplicator.Changed(chg.key)(chg.dataValue)
Actor.same
case cmd: JReplicator.Unsubscribe[ReplicatedData] @unchecked
stopSubscribeAdapter(cmd.subscriber)
}
}
.onSignal {
case (ctx, Terminated(ref: ActorRef[JReplicator.Changed[ReplicatedData]] @unchecked))
stopSubscribeAdapter(ref)
}
}
withState(Map.empty)
}
}

View file

@ -26,7 +26,7 @@ object Replicator {
def behavior(settings: dd.ReplicatorSettings): Behavior[Command[_]] =
ReplicatorBehavior.behavior(settings).narrow[Command[_]]
sealed trait Command[A <: ReplicatedData] extends scaladsl.Replicator.Command[A] {
trait Command[A <: ReplicatedData] extends scaladsl.Replicator.Command[A] {
def key: Key[A]
}
@ -254,4 +254,46 @@ object Replicator {
def getRequest: Optional[Any] = request
}
/**
* Register a subscriber that will be notified with a [[Changed]] message
* when the value of the given `key` is changed. Current value is also
* sent as a [[Changed]] message to a new subscriber.
*
* Subscribers will be notified periodically with the configured `notify-subscribers-interval`,
* and it is also possible to send an explicit `FlushChanges` message to
* the `Replicator` to notify the subscribers immediately.
*
* The subscriber will automatically be unregistered if it is terminated.
*
* If the key is deleted the subscriber is notified with a [[Deleted]]
* message.
*/
final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command[A]
/**
* Unregister a subscriber.
*
* @see [[Replicator.Subscribe]]
*/
final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command[A]
/**
* The data value is retrieved with [[#get]] using the typed key.
*
* @see [[Replicator.Subscribe]]
*/
final case class Changed[A <: ReplicatedData](key: Key[A])(data: A) {
/**
* The data value, with correct type.
* Scala pattern matching cannot infer the type from the `key` parameter.
*/
def get[T <: ReplicatedData](key: Key[T]): T = {
require(key == this.key, "wrong key used, must use contained key")
data.asInstanceOf[T]
}
/**
* The data value. Use [[#get]] to get the fully typed value.
*/
def dataValue: A = data
}
}

View file

@ -34,9 +34,7 @@ object Replicator {
}
final case class Get[A <: ReplicatedData](key: Key[A], consistency: ReadConsistency, request: Option[Any] = None)(val replyTo: ActorRef[GetResponse[A]])
extends Command[A] {
}
extends Command[A]
type GetResponse[A <: ReplicatedData] = dd.Replicator.GetResponse[A]
object GetSuccess {
@ -97,4 +95,34 @@ object Replicator {
type ModifyFailure[A <: ReplicatedData] = dd.Replicator.ModifyFailure[A]
type StoreFailure[A <: ReplicatedData] = dd.Replicator.StoreFailure[A]
/**
* Register a subscriber that will be notified with a [[Changed]] message
* when the value of the given `key` is changed. Current value is also
* sent as a [[Changed]] message to a new subscriber.
*
* Subscribers will be notified periodically with the configured `notify-subscribers-interval`,
* and it is also possible to send an explicit `FlushChanges` message to
* the `Replicator` to notify the subscribers immediately.
*
* The subscriber will automatically be unregistered if it is terminated.
*
* If the key is deleted the subscriber is notified with a [[Deleted]]
* message.
*/
final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]])
extends Command[A]
/**
* Unregister a subscriber.
*
* @see [[Replicator.Subscribe]]
*/
final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]])
extends Command[A]
object Changed {
def unapply[A <: ReplicatedData](chg: Changed[A]): Option[Key[A]] = Some(chg.key)
}
type Changed[A <: ReplicatedData] = dd.Replicator.Changed[A]
}