From 223f84e8b9a8d72122abf0f45c3746d013768279 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Sun, 8 Sep 2019 19:56:04 +0200 Subject: [PATCH] doc: stylish distributed-data.md, #24717 * move to docs package * style changes --- .../ddata/typed/javadsl/ReplicatorTest.java | 242 ----------------- .../typed/javadsl/ReplicatorDocSample.java | 168 ++++++++++++ .../typed/javadsl/ReplicatorDocTest.java | 92 +++++++ .../scaladsl/ReplicatorCompileOnlyTest.scala | 117 +++++++++ .../ddata/typed/scaladsl/ReplicatorSpec.scala | 244 +----------------- .../typed/scaladsl/ReplicatorDocSpec.scala | 158 ++++++++++++ .../main/paradox/typed/distributed-data.md | 8 +- 7 files changed, 543 insertions(+), 486 deletions(-) delete mode 100644 akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java create mode 100644 akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocSample.java create mode 100644 akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocTest.java create mode 100644 akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorCompileOnlyTest.scala create mode 100644 akka-cluster-typed/src/test/scala/docs/akka/cluster/ddata/typed/scaladsl/ReplicatorDocSpec.scala diff --git a/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java b/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java deleted file mode 100644 index b506a9ff78..0000000000 --- a/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Copyright (C) 2017-2019 Lightbend Inc. - */ - -package akka.cluster.ddata.typed.javadsl; - -// FIXME move to doc package - -import akka.actor.testkit.typed.javadsl.LogCapturing; -import akka.actor.testkit.typed.javadsl.TestKitJunitResource; -import akka.actor.testkit.typed.javadsl.TestProbe; -import akka.cluster.ddata.*; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.scalatest.junit.JUnitSuite; - -// #sample -import akka.actor.typed.ActorRef; -import akka.actor.typed.Behavior; -import akka.actor.typed.javadsl.Behaviors; -import akka.actor.typed.javadsl.AbstractBehavior; -import akka.actor.typed.javadsl.ActorContext; -import akka.actor.typed.javadsl.Receive; - -import static org.junit.Assert.assertEquals; - -// #sample - -public class ReplicatorTest extends JUnitSuite { - - // #sample - interface ClientCommand {} - - enum Increment implements ClientCommand { - INSTANCE - } - - static final class GetValue implements ClientCommand { - final ActorRef replyTo; - - GetValue(ActorRef replyTo) { - this.replyTo = replyTo; - } - } - - static final class GetCachedValue implements ClientCommand { - final ActorRef replyTo; - - GetCachedValue(ActorRef replyTo) { - this.replyTo = replyTo; - } - } - - private interface InternalMsg extends ClientCommand {} - - private static final class InternalUpdateResponse implements InternalMsg { - final Replicator.UpdateResponse rsp; - - InternalUpdateResponse(Replicator.UpdateResponse rsp) { - this.rsp = rsp; - } - } - - private static final class InternalGetResponse implements InternalMsg { - final Replicator.GetResponse rsp; - final ActorRef replyTo; - - InternalGetResponse(Replicator.GetResponse rsp, ActorRef replyTo) { - this.rsp = rsp; - this.replyTo = replyTo; - } - } - - private static final class InternalSubscribeResponse implements InternalMsg { - final Replicator.SubscribeResponse rsp; - - InternalSubscribeResponse(Replicator.SubscribeResponse rsp) { - this.rsp = rsp; - } - } - - static class Counter extends AbstractBehavior { - private final ActorContext context; - // adapter that turns the response messages from the replicator into our own protocol - private final ReplicatorMessageAdapter replicatorAdapter; - private final SelfUniqueAddress node; - private final Key key; - - private int cachedValue = 0; - - Counter( - ActorContext ctx, - ReplicatorMessageAdapter replicatorAdapter, - Key key) { - - context = ctx; - this.replicatorAdapter = replicatorAdapter; - this.key = key; - - node = DistributedData.get(ctx.getSystem()).selfUniqueAddress(); - - this.replicatorAdapter.subscribe(this.key, InternalSubscribeResponse::new); - } - - public static Behavior create(Key key) { - return Behaviors.setup( - ctx -> - DistributedData.withReplicatorMessageAdapter( - (ReplicatorMessageAdapter replicatorAdapter) -> - new Counter(ctx, replicatorAdapter, key))); - } - - @Override - public Receive createReceive() { - return newReceiveBuilder() - .onMessage(Increment.class, this::onIncrement) - .onMessage(InternalUpdateResponse.class, msg -> Behaviors.same()) - .onMessage(GetValue.class, this::onGetValue) - .onMessage(GetCachedValue.class, this::onGetCachedValue) - .onMessage(InternalGetResponse.class, this::onInternalGetResponse) - .onMessage(InternalSubscribeResponse.class, this::onInternalSubscribeResponse) - .build(); - } - - private Behavior onIncrement(Increment cmd) { - replicatorAdapter.askUpdate( - askReplyTo -> - new Replicator.Update<>( - key, - GCounter.empty(), - Replicator.writeLocal(), - askReplyTo, - curr -> curr.increment(node, 1)), - InternalUpdateResponse::new); - - return this; - } - - private Behavior onGetValue(GetValue cmd) { - replicatorAdapter.askGet( - askReplyTo -> new Replicator.Get<>(key, Replicator.readLocal(), askReplyTo), - rsp -> new InternalGetResponse(rsp, cmd.replyTo)); - - return this; - } - - private Behavior onGetCachedValue(GetCachedValue cmd) { - cmd.replyTo.tell(cachedValue); - return this; - } - - private Behavior onInternalGetResponse(InternalGetResponse msg) { - if (msg.rsp instanceof Replicator.GetSuccess) { - int value = ((Replicator.GetSuccess) msg.rsp).get(key).getValue().intValue(); - msg.replyTo.tell(value); - return this; - } else { - // not dealing with failures - return Behaviors.unhandled(); - } - } - - private Behavior onInternalSubscribeResponse(InternalSubscribeResponse msg) { - if (msg.rsp instanceof Replicator.Changed) { - GCounter counter = ((Replicator.Changed) msg.rsp).get(key); - cachedValue = counter.getValue().intValue(); - return this; - } else { - // no deletes - return Behaviors.unhandled(); - } - } - } - - // #sample - - static Config config = - ConfigFactory.parseString( - "akka.actor.provider = cluster \n" - + "akka.remote.classic.netty.tcp.port = 0 \n" - + "akka.remote.artery.canonical.port = 0 \n" - + "akka.remote.artery.canonical.hostname = 127.0.0.1 \n"); - - @ClassRule public static TestKitJunitResource testKit = new TestKitJunitResource(config); - - @Rule public final LogCapturing logCapturing = new LogCapturing(); - - @Test - public void shouldHaveApiForUpdateAndGet() { - TestProbe probe = testKit.createTestProbe(Integer.class); - ActorRef client = testKit.spawn(Counter.create(GCounterKey.create("counter1"))); - - client.tell(Increment.INSTANCE); - client.tell(new GetValue(probe.getRef())); - probe.expectMessage(1); - } - - @Test - public void shouldHaveApiForSubscribe() { - TestProbe probe = testKit.createTestProbe(Integer.class); - ActorRef client = testKit.spawn(Counter.create(GCounterKey.create("counter2"))); - - client.tell(Increment.INSTANCE); - client.tell(Increment.INSTANCE); - probe.awaitAssert( - () -> { - client.tell(new GetCachedValue(probe.getRef())); - probe.expectMessage(2); - return null; - }); - - client.tell(Increment.INSTANCE); - probe.awaitAssert( - () -> { - client.tell(new GetCachedValue(probe.getRef())); - probe.expectMessage(3); - return null; - }); - } - - @Test - public void shouldHaveAnExtension() { - Key key = GCounterKey.create("counter3"); - ActorRef client = testKit.spawn(Counter.create(key)); - - TestProbe probe = testKit.createTestProbe(Integer.class); - client.tell(Increment.INSTANCE); - client.tell(new GetValue(probe.getRef())); - probe.expectMessage(1); - - TestProbe> getReplyProbe = testKit.createTestProbe(); - ActorRef replicator = DistributedData.get(testKit.system()).replicator(); - replicator.tell(new Replicator.Get<>(key, Replicator.readLocal(), getReplyProbe.getRef())); - @SuppressWarnings("unchecked") - Replicator.GetSuccess rsp = - getReplyProbe.expectMessageClass(Replicator.GetSuccess.class); - assertEquals(1, rsp.get(key).getValue().intValue()); - } -} diff --git a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocSample.java b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocSample.java new file mode 100644 index 0000000000..e060479825 --- /dev/null +++ b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocSample.java @@ -0,0 +1,168 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package jdocs.akka.cluster.ddata.typed.javadsl; + +// #sample +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; +import akka.cluster.ddata.GCounter; +import akka.cluster.ddata.Key; +import akka.cluster.ddata.SelfUniqueAddress; +import akka.cluster.ddata.typed.javadsl.DistributedData; +import akka.cluster.ddata.typed.javadsl.Replicator; +import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter; + +// #sample + +interface ReplicatorDocSample { + // #sample + public class Counter extends AbstractBehavior { + interface Command {} + + enum Increment implements Command { + INSTANCE + } + + public static class GetValue implements Command { + public final ActorRef replyTo; + + public GetValue(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + + public static class GetCachedValue implements Command { + public final ActorRef replyTo; + + public GetCachedValue(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + + private interface InternalCommand extends Command {} + + private static class InternalUpdateResponse implements InternalCommand { + final Replicator.UpdateResponse rsp; + + InternalUpdateResponse(Replicator.UpdateResponse rsp) { + this.rsp = rsp; + } + } + + private static class InternalGetResponse implements InternalCommand { + final Replicator.GetResponse rsp; + final ActorRef replyTo; + + InternalGetResponse(Replicator.GetResponse rsp, ActorRef replyTo) { + this.rsp = rsp; + this.replyTo = replyTo; + } + } + + private static final class InternalSubscribeResponse implements InternalCommand { + final Replicator.SubscribeResponse rsp; + + InternalSubscribeResponse(Replicator.SubscribeResponse rsp) { + this.rsp = rsp; + } + } + + public static Behavior create(Key key) { + return Behaviors.setup( + ctx -> + DistributedData.withReplicatorMessageAdapter( + (ReplicatorMessageAdapter replicatorAdapter) -> + new Counter(ctx, replicatorAdapter, key))); + } + + private final ActorContext context; + // adapter that turns the response messages from the replicator into our own protocol + private final ReplicatorMessageAdapter replicatorAdapter; + private final SelfUniqueAddress node; + private final Key key; + + private int cachedValue = 0; + + private Counter( + ActorContext context, + ReplicatorMessageAdapter replicatorAdapter, + Key key) { + + this.context = context; + this.replicatorAdapter = replicatorAdapter; + this.key = key; + + this.node = DistributedData.get(context.getSystem()).selfUniqueAddress(); + + this.replicatorAdapter.subscribe(this.key, InternalSubscribeResponse::new); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(Increment.class, this::onIncrement) + .onMessage(InternalUpdateResponse.class, msg -> Behaviors.same()) + .onMessage(GetValue.class, this::onGetValue) + .onMessage(GetCachedValue.class, this::onGetCachedValue) + .onMessage(InternalGetResponse.class, this::onInternalGetResponse) + .onMessage(InternalSubscribeResponse.class, this::onInternalSubscribeResponse) + .build(); + } + + private Behavior onIncrement(Increment cmd) { + replicatorAdapter.askUpdate( + askReplyTo -> + new Replicator.Update<>( + key, + GCounter.empty(), + Replicator.writeLocal(), + askReplyTo, + curr -> curr.increment(node, 1)), + InternalUpdateResponse::new); + + return this; + } + + private Behavior onGetValue(GetValue cmd) { + replicatorAdapter.askGet( + askReplyTo -> new Replicator.Get<>(key, Replicator.readLocal(), askReplyTo), + rsp -> new InternalGetResponse(rsp, cmd.replyTo)); + + return this; + } + + private Behavior onGetCachedValue(GetCachedValue cmd) { + cmd.replyTo.tell(cachedValue); + return this; + } + + private Behavior onInternalGetResponse(InternalGetResponse msg) { + if (msg.rsp instanceof Replicator.GetSuccess) { + int value = ((Replicator.GetSuccess) msg.rsp).get(key).getValue().intValue(); + msg.replyTo.tell(value); + return this; + } else { + // not dealing with failures + return Behaviors.unhandled(); + } + } + + private Behavior onInternalSubscribeResponse(InternalSubscribeResponse msg) { + if (msg.rsp instanceof Replicator.Changed) { + GCounter counter = ((Replicator.Changed) msg.rsp).get(key); + cachedValue = counter.getValue().intValue(); + return this; + } else { + // no deletes + return Behaviors.unhandled(); + } + } + } +} +// #sample diff --git a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocTest.java b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocTest.java new file mode 100644 index 0000000000..8f349cffbb --- /dev/null +++ b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocTest.java @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2017-2019 Lightbend Inc. + */ + +package jdocs.akka.cluster.ddata.typed.javadsl; + +import akka.actor.testkit.typed.javadsl.LogCapturing; +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.testkit.typed.javadsl.TestProbe; +import akka.actor.typed.ActorRef; +import akka.cluster.ddata.GCounter; +import akka.cluster.ddata.GCounterKey; +import akka.cluster.ddata.Key; +import akka.cluster.ddata.typed.javadsl.DistributedData; +import akka.cluster.ddata.typed.javadsl.Replicator; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; + +import static jdocs.akka.cluster.ddata.typed.javadsl.ReplicatorDocSample.Counter; +import static org.junit.Assert.assertEquals; + +public class ReplicatorDocTest extends JUnitSuite { + + static Config config = + ConfigFactory.parseString( + "akka.actor.provider = cluster \n" + + "akka.remote.classic.netty.tcp.port = 0 \n" + + "akka.remote.artery.canonical.port = 0 \n" + + "akka.remote.artery.canonical.hostname = 127.0.0.1 \n"); + + @ClassRule public static TestKitJunitResource testKit = new TestKitJunitResource(config); + + @Rule public final LogCapturing logCapturing = new LogCapturing(); + + @Test + public void shouldHaveApiForUpdateAndGet() { + TestProbe probe = testKit.createTestProbe(Integer.class); + ActorRef client = + testKit.spawn(Counter.create(GCounterKey.create("counter1"))); + + client.tell(Counter.Increment.INSTANCE); + client.tell(new Counter.GetValue(probe.getRef())); + probe.expectMessage(1); + } + + @Test + public void shouldHaveApiForSubscribe() { + TestProbe probe = testKit.createTestProbe(Integer.class); + ActorRef client = + testKit.spawn(Counter.create(GCounterKey.create("counter2"))); + + client.tell(Counter.Increment.INSTANCE); + client.tell(Counter.Increment.INSTANCE); + probe.awaitAssert( + () -> { + client.tell(new Counter.GetCachedValue(probe.getRef())); + probe.expectMessage(2); + return null; + }); + + client.tell(Counter.Increment.INSTANCE); + probe.awaitAssert( + () -> { + client.tell(new Counter.GetCachedValue(probe.getRef())); + probe.expectMessage(3); + return null; + }); + } + + @Test + public void shouldHaveAnExtension() { + Key key = GCounterKey.create("counter3"); + ActorRef client = testKit.spawn(Counter.create(key)); + + TestProbe probe = testKit.createTestProbe(Integer.class); + client.tell(Counter.Increment.INSTANCE); + client.tell(new Counter.GetValue(probe.getRef())); + probe.expectMessage(1); + + TestProbe> getReplyProbe = testKit.createTestProbe(); + ActorRef replicator = DistributedData.get(testKit.system()).replicator(); + replicator.tell(new Replicator.Get<>(key, Replicator.readLocal(), getReplyProbe.getRef())); + @SuppressWarnings("unchecked") + Replicator.GetSuccess rsp = + getReplyProbe.expectMessageClass(Replicator.GetSuccess.class); + assertEquals(1, rsp.get(key).getValue().intValue()); + } +} diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorCompileOnlyTest.scala b/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorCompileOnlyTest.scala new file mode 100644 index 0000000000..4bc0c699e5 --- /dev/null +++ b/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorCompileOnlyTest.scala @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.cluster.ddata.typed.scaladsl + +import scala.concurrent.Future +import scala.concurrent.duration._ + +import akka.actor.typed.ActorRef +import akka.actor.typed.Scheduler +import akka.cluster.ddata.GCounter +import akka.cluster.ddata.GCounterKey +import akka.cluster.ddata.SelfUniqueAddress +import akka.cluster.ddata.typed.scaladsl.Replicator._ +import akka.util.Timeout + +object ReplicatorCompileOnlyTest { + sealed trait ClientCommand + private sealed trait InternalMsg extends ClientCommand + private case class InternalUpdateResponse(rsp: Replicator.UpdateResponse[GCounter]) extends InternalMsg + private case class InternalGetResponse(rsp: Replicator.GetResponse[GCounter], replyTo: ActorRef[Int]) + extends InternalMsg + + def shouldHaveConvenienceForAsk(): Unit = { + import akka.actor.typed.scaladsl.AskPattern._ + + val replicator: ActorRef[Replicator.Command] = ??? + implicit val timeout = Timeout(3.seconds) + implicit val scheduler: Scheduler = ??? + implicit val cluster: SelfUniqueAddress = ??? + val key = GCounterKey("counter") + + val reply1: Future[GetResponse[GCounter]] = replicator.ask(Replicator.Get(key, Replicator.ReadLocal)) + + val reply2: Future[UpdateResponse[GCounter]] = + replicator.ask(Replicator.Update(key, GCounter.empty, Replicator.WriteLocal)(_ :+ 1)) + + val reply3: Future[DeleteResponse[GCounter]] = replicator.ask(Replicator.Delete(key, Replicator.WriteLocal)) + + val reply4: Future[ReplicaCount] = replicator.ask(Replicator.GetReplicaCount()) + + // suppress unused compiler warnings + println("" + reply1 + reply2 + reply3 + reply4) + } + + def shouldHaveConvenienceForAsk2(): Unit = { + implicit val cluster: SelfUniqueAddress = ??? + val replicatorAdapter: ReplicatorMessageAdapter[ClientCommand, GCounter] = ??? + val replyTo: ActorRef[Int] = ??? + val key = GCounterKey("counter") + + //#curried-update + // alternative way to define the `createRequest` function + // Replicator.Update instance has a curried `apply` method + replicatorAdapter.askUpdate( + Replicator.Update(key, GCounter.empty, Replicator.WriteLocal)(_ :+ 1), + InternalUpdateResponse.apply) + + // that is the same as + replicatorAdapter.askUpdate( + askReplyTo => Replicator.Update(key, GCounter.empty, Replicator.WriteLocal, askReplyTo)(_ :+ 1), + InternalUpdateResponse.apply) + //#curried-update + + //#curried-get + // alternative way to define the `createRequest` function + // Replicator.Get instance has a curried `apply` method + replicatorAdapter.askGet(Replicator.Get(key, Replicator.ReadLocal), value => InternalGetResponse(value, replyTo)) + + // that is the same as + replicatorAdapter.askGet( + askReplyTo => Replicator.Get(key, Replicator.ReadLocal, askReplyTo), + value => InternalGetResponse(value, replyTo)) + //#curried-get + } + + def shouldHaveUnapplyForResponseTypes(): Unit = { + val getResponse: GetResponse[GCounter] = ??? + val key = GCounterKey("counter") + + getResponse match { + case GetSuccess(`key`) => + case GetFailure(`key`) => + case NotFound(`key`) => + case GetDataDeleted(`key`) => + } + + val updateResponse: UpdateResponse[GCounter] = ??? + updateResponse match { + case UpdateSuccess(`key`) => + case ModifyFailure(`key`, _, _) => + case UpdateTimeout(`key`) => + case StoreFailure(`key`) => + case UpdateFailure(`key`) => + case UpdateDataDeleted(`key`) => + } + + val deleteResponse: DeleteResponse[GCounter] = ??? + deleteResponse match { + case DeleteSuccess(`key`) => + case DeleteFailure(`key`) => + case DataDeleted(`key`) => + } + + val subscribeResponse: SubscribeResponse[GCounter] = ??? + subscribeResponse match { + case Changed(`key`) => + case Deleted(`key`) => + } + + val replicaCount: ReplicaCount = ??? + replicaCount match { + case ReplicaCount(_) => + } + } +} diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala index 09db868db1..e0066f3c4a 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala @@ -1,251 +1,15 @@ /* - * Copyright (C) 2017-2019 Lightbend Inc. + * Copyright (C) 2019 Lightbend Inc. */ package akka.cluster.ddata.typed.scaladsl -// FIXME move to doc package - +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import org.scalatest.WordSpecLike -import akka.cluster.ddata.SelfUniqueAddress - -// #sample -import akka.actor.typed.Scheduler -import akka.actor.typed.{ ActorRef, Behavior } -import akka.actor.typed.scaladsl.Behaviors -import akka.cluster.ddata.typed.scaladsl.Replicator._ -import akka.cluster.ddata.{ GCounter, GCounterKey } -import akka.actor.testkit.typed.scaladsl._ -import akka.util.Timeout -import com.typesafe.config.ConfigFactory - -import scala.concurrent.Future -import scala.concurrent.duration._ - -// #sample - -object ReplicatorSpec { - - val config = ConfigFactory.parseString(""" - akka.actor.provider = "cluster" - akka.remote.classic.netty.tcp.port = 0 - akka.remote.artery.canonical.port = 0 - akka.remote.artery.canonical.hostname = 127.0.0.1 - """) - - // #sample - sealed trait ClientCommand - final case object Increment extends ClientCommand - final case class GetValue(replyTo: ActorRef[Int]) extends ClientCommand - final case class GetCachedValue(replyTo: ActorRef[Int]) extends ClientCommand - private sealed trait InternalMsg extends ClientCommand - private case class InternalUpdateResponse(rsp: Replicator.UpdateResponse[GCounter]) extends InternalMsg - private case class InternalGetResponse(rsp: Replicator.GetResponse[GCounter], replyTo: ActorRef[Int]) - extends InternalMsg - private case class InternalSubscribeResponse(chg: Replicator.SubscribeResponse[GCounter]) extends InternalMsg - - def client(key: GCounterKey): Behavior[ClientCommand] = - Behaviors.setup[ClientCommand] { ctx => - implicit val node: SelfUniqueAddress = DistributedData(ctx.system).selfUniqueAddress - - // adapter that turns the response messages from the replicator into our own protocol - DistributedData.withReplicatorMessageAdapter[ClientCommand, GCounter] { replicatorAdapter => - replicatorAdapter.subscribe(key, InternalSubscribeResponse.apply) - - def behavior(cachedValue: Int): Behavior[ClientCommand] = { - Behaviors.receiveMessage[ClientCommand] { - case Increment => - replicatorAdapter.askUpdate( - askReplyTo => Replicator.Update(key, GCounter.empty, Replicator.WriteLocal, askReplyTo)(_ :+ 1), - InternalUpdateResponse.apply) - - Behaviors.same - - case GetValue(replyTo) => - replicatorAdapter.askGet( - askReplyTo => Replicator.Get(key, Replicator.ReadLocal, askReplyTo), - value => InternalGetResponse(value, replyTo)) - - Behaviors.same - - case GetCachedValue(replyTo) => - replyTo ! cachedValue - Behaviors.same - - case internal: InternalMsg => - internal match { - case InternalUpdateResponse(_) => Behaviors.same // ok - - case InternalGetResponse(rsp @ Replicator.GetSuccess(`key`), replyTo) => - val value = rsp.get(key).value.toInt - replyTo ! value - Behaviors.same - - case InternalGetResponse(_, _) => - Behaviors.unhandled // not dealing with failures - - case InternalSubscribeResponse(chg @ Replicator.Changed(`key`)) => - val value = chg.get(key).value.intValue - behavior(value) - - case InternalSubscribeResponse(Replicator.Deleted(_)) => - Behaviors.unhandled // no deletes - } - } - } - - behavior(cachedValue = 0) - } - } - // #sample - - object CompileOnlyTest { - def shouldHaveConvenienceForAsk(): Unit = { - import akka.actor.typed.scaladsl.AskPattern._ - - val replicator: ActorRef[Replicator.Command] = ??? - implicit val timeout = Timeout(3.seconds) - implicit val scheduler: Scheduler = ??? - implicit val cluster: SelfUniqueAddress = ??? - val key = GCounterKey("counter") - - val reply1: Future[GetResponse[GCounter]] = replicator.ask(Replicator.Get(key, Replicator.ReadLocal)) - - val reply2: Future[UpdateResponse[GCounter]] = - replicator.ask(Replicator.Update(key, GCounter.empty, Replicator.WriteLocal)(_ :+ 1)) - - val reply3: Future[DeleteResponse[GCounter]] = replicator.ask(Replicator.Delete(key, Replicator.WriteLocal)) - - val reply4: Future[ReplicaCount] = replicator.ask(Replicator.GetReplicaCount()) - - // suppress unused compiler warnings - println("" + reply1 + reply2 + reply3 + reply4) - } - - def shouldHaveConvenienceForAsk2(): Unit = { - implicit val cluster: SelfUniqueAddress = ??? - val replicatorAdapter: ReplicatorMessageAdapter[ClientCommand, GCounter] = ??? - val replyTo: ActorRef[Int] = ??? - val key = GCounterKey("counter") - - //#curried-update - // alternative way to define the `createRequest` function - // Replicator.Update instance has a curried `apply` method - replicatorAdapter.askUpdate( - Replicator.Update(key, GCounter.empty, Replicator.WriteLocal)(_ :+ 1), - InternalUpdateResponse.apply) - - // that is the same as - replicatorAdapter.askUpdate( - askReplyTo => Replicator.Update(key, GCounter.empty, Replicator.WriteLocal, askReplyTo)(_ :+ 1), - InternalUpdateResponse.apply) - //#curried-update - - //#curried-get - // alternative way to define the `createRequest` function - // Replicator.Get instance has a curried `apply` method - replicatorAdapter.askGet(Replicator.Get(key, Replicator.ReadLocal), value => InternalGetResponse(value, replyTo)) - - // that is the same as - replicatorAdapter.askGet( - askReplyTo => Replicator.Get(key, Replicator.ReadLocal, askReplyTo), - value => InternalGetResponse(value, replyTo)) - //#curried-get - } - - def shouldHaveUnapplyForResponseTypes(): Unit = { - val getResponse: GetResponse[GCounter] = ??? - val key = GCounterKey("counter") - - getResponse match { - case GetSuccess(`key`) => - case GetFailure(`key`) => - case NotFound(`key`) => - case GetDataDeleted(`key`) => - } - - val updateResponse: UpdateResponse[GCounter] = ??? - updateResponse match { - case UpdateSuccess(`key`) => - case ModifyFailure(`key`, _, _) => - case UpdateTimeout(`key`) => - case StoreFailure(`key`) => - case UpdateFailure(`key`) => - case UpdateDataDeleted(`key`) => - } - - val deleteResponse: DeleteResponse[GCounter] = ??? - deleteResponse match { - case DeleteSuccess(`key`) => - case DeleteFailure(`key`) => - case DataDeleted(`key`) => - } - - val subscribeResponse: SubscribeResponse[GCounter] = ??? - subscribeResponse match { - case Changed(`key`) => - case Deleted(`key`) => - } - - val replicaCount: ReplicaCount = ??? - replicaCount match { - case ReplicaCount(_) => - } - } - } - -} - -class ReplicatorSpec extends ScalaTestWithActorTestKit(ReplicatorSpec.config) with WordSpecLike with LogCapturing { - - import ReplicatorSpec._ - - implicit val selfNodeAddress = DistributedData(system).selfUniqueAddress +class ReplicatorSpec extends ScalaTestWithActorTestKit with WordSpecLike with LogCapturing { "Replicator" must { - - "have API for Update and Get" in { - val c = spawn(client(GCounterKey("counter1"))) - - val probe = createTestProbe[Int]() - c ! Increment - c ! GetValue(probe.ref) - probe.expectMessage(1) - } - - "have API for Subscribe" in { - val c = spawn(client(GCounterKey("counter2"))) - - val probe = createTestProbe[Int]() - c ! Increment - c ! Increment - eventually { - c ! GetCachedValue(probe.ref) - probe.expectMessage(2) - } - c ! Increment - eventually { - c ! GetCachedValue(probe.ref) - probe.expectMessage(3) - } - } - - "have an extension" in { - val key = GCounterKey("counter3") - val c = spawn(client(key)) - - val probe = createTestProbe[Int]() - c ! Increment - c ! GetValue(probe.ref) - probe.expectMessage(1) - - val getReplyProbe = createTestProbe[Replicator.GetResponse[GCounter]]() - val replicator = DistributedData(system).replicator - replicator ! Replicator.Get(key, Replicator.ReadLocal, getReplyProbe.ref) - val rsp = getReplyProbe.expectMessageType[GetSuccess[GCounter]] - rsp.get(key).value.toInt should ===(1) - } - "have the prefixed replicator name" in { ReplicatorSettings.name(system) should ===("typedDdataReplicator") } diff --git a/akka-cluster-typed/src/test/scala/docs/akka/cluster/ddata/typed/scaladsl/ReplicatorDocSpec.scala b/akka-cluster-typed/src/test/scala/docs/akka/cluster/ddata/typed/scaladsl/ReplicatorDocSpec.scala new file mode 100644 index 0000000000..281a4cac61 --- /dev/null +++ b/akka-cluster-typed/src/test/scala/docs/akka/cluster/ddata/typed/scaladsl/ReplicatorDocSpec.scala @@ -0,0 +1,158 @@ +/* + * Copyright (C) 2017-2019 Lightbend Inc. + */ + +package docs.akka.cluster.ddata.typed.scaladsl + +import akka.cluster.ddata.SelfUniqueAddress +import akka.cluster.ddata.typed.scaladsl.DistributedData +import akka.cluster.ddata.typed.scaladsl.Replicator +import org.scalatest.WordSpecLike + +import akka.actor.testkit.typed.scaladsl._ +import com.typesafe.config.ConfigFactory + +// #sample +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors +import akka.cluster.ddata.GCounter +import akka.cluster.ddata.GCounterKey +import akka.cluster.ddata.typed.scaladsl.Replicator._ + +// #sample + +object ReplicatorDocSpec { + + val config = ConfigFactory.parseString(""" + akka.actor.provider = cluster + akka.remote.classic.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + akka.remote.artery.canonical.hostname = 127.0.0.1 + """) + + // #sample + object Counter { + sealed trait Command + final case object Increment extends Command + final case class GetValue(replyTo: ActorRef[Int]) extends Command + final case class GetCachedValue(replyTo: ActorRef[Int]) extends Command + private sealed trait InternalCommand extends Command + private case class InternalUpdateResponse(rsp: Replicator.UpdateResponse[GCounter]) extends InternalCommand + private case class InternalGetResponse(rsp: Replicator.GetResponse[GCounter], replyTo: ActorRef[Int]) + extends InternalCommand + private case class InternalSubscribeResponse(chg: Replicator.SubscribeResponse[GCounter]) extends InternalCommand + + def apply(key: GCounterKey): Behavior[Command] = + Behaviors.setup[Command] { ctx => + implicit val node: SelfUniqueAddress = DistributedData(ctx.system).selfUniqueAddress + + // adapter that turns the response messages from the replicator into our own protocol + DistributedData.withReplicatorMessageAdapter[Command, GCounter] { replicatorAdapter => + replicatorAdapter.subscribe(key, InternalSubscribeResponse.apply) + + def updated(cachedValue: Int): Behavior[Command] = { + Behaviors.receiveMessage[Command] { + case Increment => + replicatorAdapter.askUpdate( + askReplyTo => Replicator.Update(key, GCounter.empty, Replicator.WriteLocal, askReplyTo)(_ :+ 1), + InternalUpdateResponse.apply) + + Behaviors.same + + case GetValue(replyTo) => + replicatorAdapter.askGet( + askReplyTo => Replicator.Get(key, Replicator.ReadLocal, askReplyTo), + value => InternalGetResponse(value, replyTo)) + + Behaviors.same + + case GetCachedValue(replyTo) => + replyTo ! cachedValue + Behaviors.same + + case internal: InternalCommand => + internal match { + case InternalUpdateResponse(_) => Behaviors.same // ok + + case InternalGetResponse(rsp @ Replicator.GetSuccess(`key`), replyTo) => + val value = rsp.get(key).value.toInt + replyTo ! value + Behaviors.same + + case InternalGetResponse(_, _) => + Behaviors.unhandled // not dealing with failures + + case InternalSubscribeResponse(chg @ Replicator.Changed(`key`)) => + val value = chg.get(key).value.intValue + updated(value) + + case InternalSubscribeResponse(Replicator.Deleted(_)) => + Behaviors.unhandled // no deletes + } + } + } + + updated(cachedValue = 0) + } + } + } + // #sample + +} + +class ReplicatorDocSpec + extends ScalaTestWithActorTestKit(ReplicatorDocSpec.config) + with WordSpecLike + with LogCapturing { + + import ReplicatorDocSpec._ + + implicit val selfNodeAddress = DistributedData(system).selfUniqueAddress + + "Replicator" must { + + "have API for Update and Get" in { + val c = spawn(Counter(GCounterKey("counter1"))) + + val probe = createTestProbe[Int]() + c ! Counter.Increment + c ! Counter.GetValue(probe.ref) + probe.expectMessage(1) + } + + "have API for Subscribe" in { + val c = spawn(Counter(GCounterKey("counter2"))) + + val probe = createTestProbe[Int]() + c ! Counter.Increment + c ! Counter.Increment + eventually { + c ! Counter.GetCachedValue(probe.ref) + probe.expectMessage(2) + } + c ! Counter.Increment + eventually { + c ! Counter.GetCachedValue(probe.ref) + probe.expectMessage(3) + } + } + + "have an extension" in { + val key = GCounterKey("counter3") + val c = spawn(Counter(key)) + + val probe = createTestProbe[Int]() + c ! Counter.Increment + c ! Counter.GetValue(probe.ref) + probe.expectMessage(1) + + val getReplyProbe = createTestProbe[Replicator.GetResponse[GCounter]]() + val replicator = DistributedData(system).replicator + replicator ! Replicator.Get(key, Replicator.ReadLocal, getReplyProbe.ref) + val rsp = getReplyProbe.expectMessageType[GetSuccess[GCounter]] + rsp.get(key).value.toInt should ===(1) + } + + } +} diff --git a/akka-docs/src/main/paradox/typed/distributed-data.md b/akka-docs/src/main/paradox/typed/distributed-data.md index 1988bfa04d..f5a020502b 100644 --- a/akka-docs/src/main/paradox/typed/distributed-data.md +++ b/akka-docs/src/main/paradox/typed/distributed-data.md @@ -50,10 +50,10 @@ This sample uses the replicated data type `GCounter` to implement a counter that cluster: Scala -: @@snip [ReplicatorSpec.scala](/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala) { #sample } +: @@snip [ReplicatorSpec.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/ddata/typed/scaladsl/ReplicatorDocSpec.scala) { #sample } Java -: @@snip [ReplicatorTest.java](/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java) { #sample } +: @@snip [ReplicatorTest.java](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocSample.java) { #sample } Although you can interact with the `Replicator` using the @scala[`ActorRef[Replicator.Command]`]@java[`ActorRef`] from @scala[`DistributedData(ctx.system).replicator`]@java[`DistributedData(ctx.getSystem()).replicator()`] it's @@ -86,12 +86,12 @@ for more details about `Get`, `Update` and `Delete` interactions with the replic There is alternative way of constructing the function for the `Update` message: Scala -: @@snip [ReplicatorSpec.scala](/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala) { #curried-update } +: @@snip [ReplicatorSpec.scala](/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorCompileOnlyTest.scala) { #curried-update } Similar is supported for `Get` and `Delete`: Scala -: @@snip [ReplicatorSpec.scala](/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala) { #curried-get } +: @@snip [ReplicatorSpec.scala](/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorCompileOnlyTest.scala) { #curried-get } @@@