Merge pull request #27659 from akka/wip-24717-doc-apply-style13-patriknw
doc: stylish distributed-data.md, #24717
This commit is contained in:
commit
39d2db9abb
7 changed files with 543 additions and 486 deletions
|
|
@ -1,242 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2017-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.cluster.ddata.typed.javadsl;
|
||||
|
||||
// FIXME move to doc package
|
||||
|
||||
import akka.actor.testkit.typed.javadsl.LogCapturing;
|
||||
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
|
||||
import akka.actor.testkit.typed.javadsl.TestProbe;
|
||||
import akka.cluster.ddata.*;
|
||||
import com.typesafe.config.Config;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
|
||||
// #sample
|
||||
import akka.actor.typed.ActorRef;
|
||||
import akka.actor.typed.Behavior;
|
||||
import akka.actor.typed.javadsl.Behaviors;
|
||||
import akka.actor.typed.javadsl.AbstractBehavior;
|
||||
import akka.actor.typed.javadsl.ActorContext;
|
||||
import akka.actor.typed.javadsl.Receive;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
// #sample
|
||||
|
||||
public class ReplicatorTest extends JUnitSuite {
|
||||
|
||||
// #sample
|
||||
interface ClientCommand {}
|
||||
|
||||
enum Increment implements ClientCommand {
|
||||
INSTANCE
|
||||
}
|
||||
|
||||
static final class GetValue implements ClientCommand {
|
||||
final ActorRef<Integer> replyTo;
|
||||
|
||||
GetValue(ActorRef<Integer> replyTo) {
|
||||
this.replyTo = replyTo;
|
||||
}
|
||||
}
|
||||
|
||||
static final class GetCachedValue implements ClientCommand {
|
||||
final ActorRef<Integer> replyTo;
|
||||
|
||||
GetCachedValue(ActorRef<Integer> replyTo) {
|
||||
this.replyTo = replyTo;
|
||||
}
|
||||
}
|
||||
|
||||
private interface InternalMsg extends ClientCommand {}
|
||||
|
||||
private static final class InternalUpdateResponse implements InternalMsg {
|
||||
final Replicator.UpdateResponse<GCounter> rsp;
|
||||
|
||||
InternalUpdateResponse(Replicator.UpdateResponse<GCounter> rsp) {
|
||||
this.rsp = rsp;
|
||||
}
|
||||
}
|
||||
|
||||
private static final class InternalGetResponse implements InternalMsg {
|
||||
final Replicator.GetResponse<GCounter> rsp;
|
||||
final ActorRef<Integer> replyTo;
|
||||
|
||||
InternalGetResponse(Replicator.GetResponse<GCounter> rsp, ActorRef<Integer> replyTo) {
|
||||
this.rsp = rsp;
|
||||
this.replyTo = replyTo;
|
||||
}
|
||||
}
|
||||
|
||||
private static final class InternalSubscribeResponse implements InternalMsg {
|
||||
final Replicator.SubscribeResponse<GCounter> rsp;
|
||||
|
||||
InternalSubscribeResponse(Replicator.SubscribeResponse<GCounter> rsp) {
|
||||
this.rsp = rsp;
|
||||
}
|
||||
}
|
||||
|
||||
static class Counter extends AbstractBehavior<ClientCommand> {
|
||||
private final ActorContext<ClientCommand> context;
|
||||
// adapter that turns the response messages from the replicator into our own protocol
|
||||
private final ReplicatorMessageAdapter<ClientCommand, GCounter> replicatorAdapter;
|
||||
private final SelfUniqueAddress node;
|
||||
private final Key<GCounter> key;
|
||||
|
||||
private int cachedValue = 0;
|
||||
|
||||
Counter(
|
||||
ActorContext<ClientCommand> ctx,
|
||||
ReplicatorMessageAdapter<ClientCommand, GCounter> replicatorAdapter,
|
||||
Key<GCounter> key) {
|
||||
|
||||
context = ctx;
|
||||
this.replicatorAdapter = replicatorAdapter;
|
||||
this.key = key;
|
||||
|
||||
node = DistributedData.get(ctx.getSystem()).selfUniqueAddress();
|
||||
|
||||
this.replicatorAdapter.subscribe(this.key, InternalSubscribeResponse::new);
|
||||
}
|
||||
|
||||
public static Behavior<ClientCommand> create(Key<GCounter> key) {
|
||||
return Behaviors.setup(
|
||||
ctx ->
|
||||
DistributedData.withReplicatorMessageAdapter(
|
||||
(ReplicatorMessageAdapter<ClientCommand, GCounter> replicatorAdapter) ->
|
||||
new Counter(ctx, replicatorAdapter, key)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive<ClientCommand> createReceive() {
|
||||
return newReceiveBuilder()
|
||||
.onMessage(Increment.class, this::onIncrement)
|
||||
.onMessage(InternalUpdateResponse.class, msg -> Behaviors.same())
|
||||
.onMessage(GetValue.class, this::onGetValue)
|
||||
.onMessage(GetCachedValue.class, this::onGetCachedValue)
|
||||
.onMessage(InternalGetResponse.class, this::onInternalGetResponse)
|
||||
.onMessage(InternalSubscribeResponse.class, this::onInternalSubscribeResponse)
|
||||
.build();
|
||||
}
|
||||
|
||||
private Behavior<ClientCommand> onIncrement(Increment cmd) {
|
||||
replicatorAdapter.askUpdate(
|
||||
askReplyTo ->
|
||||
new Replicator.Update<>(
|
||||
key,
|
||||
GCounter.empty(),
|
||||
Replicator.writeLocal(),
|
||||
askReplyTo,
|
||||
curr -> curr.increment(node, 1)),
|
||||
InternalUpdateResponse::new);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
private Behavior<ClientCommand> onGetValue(GetValue cmd) {
|
||||
replicatorAdapter.askGet(
|
||||
askReplyTo -> new Replicator.Get<>(key, Replicator.readLocal(), askReplyTo),
|
||||
rsp -> new InternalGetResponse(rsp, cmd.replyTo));
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
private Behavior<ClientCommand> onGetCachedValue(GetCachedValue cmd) {
|
||||
cmd.replyTo.tell(cachedValue);
|
||||
return this;
|
||||
}
|
||||
|
||||
private Behavior<ClientCommand> onInternalGetResponse(InternalGetResponse msg) {
|
||||
if (msg.rsp instanceof Replicator.GetSuccess) {
|
||||
int value = ((Replicator.GetSuccess<?>) msg.rsp).get(key).getValue().intValue();
|
||||
msg.replyTo.tell(value);
|
||||
return this;
|
||||
} else {
|
||||
// not dealing with failures
|
||||
return Behaviors.unhandled();
|
||||
}
|
||||
}
|
||||
|
||||
private Behavior<ClientCommand> onInternalSubscribeResponse(InternalSubscribeResponse msg) {
|
||||
if (msg.rsp instanceof Replicator.Changed) {
|
||||
GCounter counter = ((Replicator.Changed<?>) msg.rsp).get(key);
|
||||
cachedValue = counter.getValue().intValue();
|
||||
return this;
|
||||
} else {
|
||||
// no deletes
|
||||
return Behaviors.unhandled();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// #sample
|
||||
|
||||
static Config config =
|
||||
ConfigFactory.parseString(
|
||||
"akka.actor.provider = cluster \n"
|
||||
+ "akka.remote.classic.netty.tcp.port = 0 \n"
|
||||
+ "akka.remote.artery.canonical.port = 0 \n"
|
||||
+ "akka.remote.artery.canonical.hostname = 127.0.0.1 \n");
|
||||
|
||||
@ClassRule public static TestKitJunitResource testKit = new TestKitJunitResource(config);
|
||||
|
||||
@Rule public final LogCapturing logCapturing = new LogCapturing();
|
||||
|
||||
@Test
|
||||
public void shouldHaveApiForUpdateAndGet() {
|
||||
TestProbe<Integer> probe = testKit.createTestProbe(Integer.class);
|
||||
ActorRef<ClientCommand> client = testKit.spawn(Counter.create(GCounterKey.create("counter1")));
|
||||
|
||||
client.tell(Increment.INSTANCE);
|
||||
client.tell(new GetValue(probe.getRef()));
|
||||
probe.expectMessage(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldHaveApiForSubscribe() {
|
||||
TestProbe<Integer> probe = testKit.createTestProbe(Integer.class);
|
||||
ActorRef<ClientCommand> client = testKit.spawn(Counter.create(GCounterKey.create("counter2")));
|
||||
|
||||
client.tell(Increment.INSTANCE);
|
||||
client.tell(Increment.INSTANCE);
|
||||
probe.awaitAssert(
|
||||
() -> {
|
||||
client.tell(new GetCachedValue(probe.getRef()));
|
||||
probe.expectMessage(2);
|
||||
return null;
|
||||
});
|
||||
|
||||
client.tell(Increment.INSTANCE);
|
||||
probe.awaitAssert(
|
||||
() -> {
|
||||
client.tell(new GetCachedValue(probe.getRef()));
|
||||
probe.expectMessage(3);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldHaveAnExtension() {
|
||||
Key<GCounter> key = GCounterKey.create("counter3");
|
||||
ActorRef<ClientCommand> client = testKit.spawn(Counter.create(key));
|
||||
|
||||
TestProbe<Integer> probe = testKit.createTestProbe(Integer.class);
|
||||
client.tell(Increment.INSTANCE);
|
||||
client.tell(new GetValue(probe.getRef()));
|
||||
probe.expectMessage(1);
|
||||
|
||||
TestProbe<Replicator.GetResponse<GCounter>> getReplyProbe = testKit.createTestProbe();
|
||||
ActorRef<Replicator.Command> replicator = DistributedData.get(testKit.system()).replicator();
|
||||
replicator.tell(new Replicator.Get<>(key, Replicator.readLocal(), getReplyProbe.getRef()));
|
||||
@SuppressWarnings("unchecked")
|
||||
Replicator.GetSuccess<GCounter> rsp =
|
||||
getReplyProbe.expectMessageClass(Replicator.GetSuccess.class);
|
||||
assertEquals(1, rsp.get(key).getValue().intValue());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,168 @@
|
|||
/*
|
||||
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.akka.cluster.ddata.typed.javadsl;
|
||||
|
||||
// #sample
|
||||
import akka.actor.typed.ActorRef;
|
||||
import akka.actor.typed.Behavior;
|
||||
import akka.actor.typed.javadsl.AbstractBehavior;
|
||||
import akka.actor.typed.javadsl.ActorContext;
|
||||
import akka.actor.typed.javadsl.Behaviors;
|
||||
import akka.actor.typed.javadsl.Receive;
|
||||
import akka.cluster.ddata.GCounter;
|
||||
import akka.cluster.ddata.Key;
|
||||
import akka.cluster.ddata.SelfUniqueAddress;
|
||||
import akka.cluster.ddata.typed.javadsl.DistributedData;
|
||||
import akka.cluster.ddata.typed.javadsl.Replicator;
|
||||
import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
|
||||
|
||||
// #sample
|
||||
|
||||
interface ReplicatorDocSample {
|
||||
// #sample
|
||||
public class Counter extends AbstractBehavior<Counter.Command> {
|
||||
interface Command {}
|
||||
|
||||
enum Increment implements Command {
|
||||
INSTANCE
|
||||
}
|
||||
|
||||
public static class GetValue implements Command {
|
||||
public final ActorRef<Integer> replyTo;
|
||||
|
||||
public GetValue(ActorRef<Integer> replyTo) {
|
||||
this.replyTo = replyTo;
|
||||
}
|
||||
}
|
||||
|
||||
public static class GetCachedValue implements Command {
|
||||
public final ActorRef<Integer> replyTo;
|
||||
|
||||
public GetCachedValue(ActorRef<Integer> replyTo) {
|
||||
this.replyTo = replyTo;
|
||||
}
|
||||
}
|
||||
|
||||
private interface InternalCommand extends Command {}
|
||||
|
||||
private static class InternalUpdateResponse implements InternalCommand {
|
||||
final Replicator.UpdateResponse<GCounter> rsp;
|
||||
|
||||
InternalUpdateResponse(Replicator.UpdateResponse<GCounter> rsp) {
|
||||
this.rsp = rsp;
|
||||
}
|
||||
}
|
||||
|
||||
private static class InternalGetResponse implements InternalCommand {
|
||||
final Replicator.GetResponse<GCounter> rsp;
|
||||
final ActorRef<Integer> replyTo;
|
||||
|
||||
InternalGetResponse(Replicator.GetResponse<GCounter> rsp, ActorRef<Integer> replyTo) {
|
||||
this.rsp = rsp;
|
||||
this.replyTo = replyTo;
|
||||
}
|
||||
}
|
||||
|
||||
private static final class InternalSubscribeResponse implements InternalCommand {
|
||||
final Replicator.SubscribeResponse<GCounter> rsp;
|
||||
|
||||
InternalSubscribeResponse(Replicator.SubscribeResponse<GCounter> rsp) {
|
||||
this.rsp = rsp;
|
||||
}
|
||||
}
|
||||
|
||||
public static Behavior<Command> create(Key<GCounter> key) {
|
||||
return Behaviors.setup(
|
||||
ctx ->
|
||||
DistributedData.withReplicatorMessageAdapter(
|
||||
(ReplicatorMessageAdapter<Command, GCounter> replicatorAdapter) ->
|
||||
new Counter(ctx, replicatorAdapter, key)));
|
||||
}
|
||||
|
||||
private final ActorContext<Command> context;
|
||||
// adapter that turns the response messages from the replicator into our own protocol
|
||||
private final ReplicatorMessageAdapter<Command, GCounter> replicatorAdapter;
|
||||
private final SelfUniqueAddress node;
|
||||
private final Key<GCounter> key;
|
||||
|
||||
private int cachedValue = 0;
|
||||
|
||||
private Counter(
|
||||
ActorContext<Command> context,
|
||||
ReplicatorMessageAdapter<Command, GCounter> replicatorAdapter,
|
||||
Key<GCounter> key) {
|
||||
|
||||
this.context = context;
|
||||
this.replicatorAdapter = replicatorAdapter;
|
||||
this.key = key;
|
||||
|
||||
this.node = DistributedData.get(context.getSystem()).selfUniqueAddress();
|
||||
|
||||
this.replicatorAdapter.subscribe(this.key, InternalSubscribeResponse::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive<Command> createReceive() {
|
||||
return newReceiveBuilder()
|
||||
.onMessage(Increment.class, this::onIncrement)
|
||||
.onMessage(InternalUpdateResponse.class, msg -> Behaviors.same())
|
||||
.onMessage(GetValue.class, this::onGetValue)
|
||||
.onMessage(GetCachedValue.class, this::onGetCachedValue)
|
||||
.onMessage(InternalGetResponse.class, this::onInternalGetResponse)
|
||||
.onMessage(InternalSubscribeResponse.class, this::onInternalSubscribeResponse)
|
||||
.build();
|
||||
}
|
||||
|
||||
private Behavior<Command> onIncrement(Increment cmd) {
|
||||
replicatorAdapter.askUpdate(
|
||||
askReplyTo ->
|
||||
new Replicator.Update<>(
|
||||
key,
|
||||
GCounter.empty(),
|
||||
Replicator.writeLocal(),
|
||||
askReplyTo,
|
||||
curr -> curr.increment(node, 1)),
|
||||
InternalUpdateResponse::new);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
private Behavior<Command> onGetValue(GetValue cmd) {
|
||||
replicatorAdapter.askGet(
|
||||
askReplyTo -> new Replicator.Get<>(key, Replicator.readLocal(), askReplyTo),
|
||||
rsp -> new InternalGetResponse(rsp, cmd.replyTo));
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
private Behavior<Command> onGetCachedValue(GetCachedValue cmd) {
|
||||
cmd.replyTo.tell(cachedValue);
|
||||
return this;
|
||||
}
|
||||
|
||||
private Behavior<Command> onInternalGetResponse(InternalGetResponse msg) {
|
||||
if (msg.rsp instanceof Replicator.GetSuccess) {
|
||||
int value = ((Replicator.GetSuccess<?>) msg.rsp).get(key).getValue().intValue();
|
||||
msg.replyTo.tell(value);
|
||||
return this;
|
||||
} else {
|
||||
// not dealing with failures
|
||||
return Behaviors.unhandled();
|
||||
}
|
||||
}
|
||||
|
||||
private Behavior<Command> onInternalSubscribeResponse(InternalSubscribeResponse msg) {
|
||||
if (msg.rsp instanceof Replicator.Changed) {
|
||||
GCounter counter = ((Replicator.Changed<?>) msg.rsp).get(key);
|
||||
cachedValue = counter.getValue().intValue();
|
||||
return this;
|
||||
} else {
|
||||
// no deletes
|
||||
return Behaviors.unhandled();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// #sample
|
||||
|
|
@ -0,0 +1,92 @@
|
|||
/*
|
||||
* Copyright (C) 2017-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.akka.cluster.ddata.typed.javadsl;
|
||||
|
||||
import akka.actor.testkit.typed.javadsl.LogCapturing;
|
||||
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
|
||||
import akka.actor.testkit.typed.javadsl.TestProbe;
|
||||
import akka.actor.typed.ActorRef;
|
||||
import akka.cluster.ddata.GCounter;
|
||||
import akka.cluster.ddata.GCounterKey;
|
||||
import akka.cluster.ddata.Key;
|
||||
import akka.cluster.ddata.typed.javadsl.DistributedData;
|
||||
import akka.cluster.ddata.typed.javadsl.Replicator;
|
||||
import com.typesafe.config.Config;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
|
||||
import static jdocs.akka.cluster.ddata.typed.javadsl.ReplicatorDocSample.Counter;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class ReplicatorDocTest extends JUnitSuite {
|
||||
|
||||
static Config config =
|
||||
ConfigFactory.parseString(
|
||||
"akka.actor.provider = cluster \n"
|
||||
+ "akka.remote.classic.netty.tcp.port = 0 \n"
|
||||
+ "akka.remote.artery.canonical.port = 0 \n"
|
||||
+ "akka.remote.artery.canonical.hostname = 127.0.0.1 \n");
|
||||
|
||||
@ClassRule public static TestKitJunitResource testKit = new TestKitJunitResource(config);
|
||||
|
||||
@Rule public final LogCapturing logCapturing = new LogCapturing();
|
||||
|
||||
@Test
|
||||
public void shouldHaveApiForUpdateAndGet() {
|
||||
TestProbe<Integer> probe = testKit.createTestProbe(Integer.class);
|
||||
ActorRef<Counter.Command> client =
|
||||
testKit.spawn(Counter.create(GCounterKey.create("counter1")));
|
||||
|
||||
client.tell(Counter.Increment.INSTANCE);
|
||||
client.tell(new Counter.GetValue(probe.getRef()));
|
||||
probe.expectMessage(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldHaveApiForSubscribe() {
|
||||
TestProbe<Integer> probe = testKit.createTestProbe(Integer.class);
|
||||
ActorRef<Counter.Command> client =
|
||||
testKit.spawn(Counter.create(GCounterKey.create("counter2")));
|
||||
|
||||
client.tell(Counter.Increment.INSTANCE);
|
||||
client.tell(Counter.Increment.INSTANCE);
|
||||
probe.awaitAssert(
|
||||
() -> {
|
||||
client.tell(new Counter.GetCachedValue(probe.getRef()));
|
||||
probe.expectMessage(2);
|
||||
return null;
|
||||
});
|
||||
|
||||
client.tell(Counter.Increment.INSTANCE);
|
||||
probe.awaitAssert(
|
||||
() -> {
|
||||
client.tell(new Counter.GetCachedValue(probe.getRef()));
|
||||
probe.expectMessage(3);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldHaveAnExtension() {
|
||||
Key<GCounter> key = GCounterKey.create("counter3");
|
||||
ActorRef<Counter.Command> client = testKit.spawn(Counter.create(key));
|
||||
|
||||
TestProbe<Integer> probe = testKit.createTestProbe(Integer.class);
|
||||
client.tell(Counter.Increment.INSTANCE);
|
||||
client.tell(new Counter.GetValue(probe.getRef()));
|
||||
probe.expectMessage(1);
|
||||
|
||||
TestProbe<Replicator.GetResponse<GCounter>> getReplyProbe = testKit.createTestProbe();
|
||||
ActorRef<Replicator.Command> replicator = DistributedData.get(testKit.system()).replicator();
|
||||
replicator.tell(new Replicator.Get<>(key, Replicator.readLocal(), getReplyProbe.getRef()));
|
||||
@SuppressWarnings("unchecked")
|
||||
Replicator.GetSuccess<GCounter> rsp =
|
||||
getReplyProbe.expectMessageClass(Replicator.GetSuccess.class);
|
||||
assertEquals(1, rsp.get(key).getValue().intValue());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,117 @@
|
|||
/*
|
||||
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.cluster.ddata.typed.scaladsl
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.Scheduler
|
||||
import akka.cluster.ddata.GCounter
|
||||
import akka.cluster.ddata.GCounterKey
|
||||
import akka.cluster.ddata.SelfUniqueAddress
|
||||
import akka.cluster.ddata.typed.scaladsl.Replicator._
|
||||
import akka.util.Timeout
|
||||
|
||||
object ReplicatorCompileOnlyTest {
|
||||
sealed trait ClientCommand
|
||||
private sealed trait InternalMsg extends ClientCommand
|
||||
private case class InternalUpdateResponse(rsp: Replicator.UpdateResponse[GCounter]) extends InternalMsg
|
||||
private case class InternalGetResponse(rsp: Replicator.GetResponse[GCounter], replyTo: ActorRef[Int])
|
||||
extends InternalMsg
|
||||
|
||||
def shouldHaveConvenienceForAsk(): Unit = {
|
||||
import akka.actor.typed.scaladsl.AskPattern._
|
||||
|
||||
val replicator: ActorRef[Replicator.Command] = ???
|
||||
implicit val timeout = Timeout(3.seconds)
|
||||
implicit val scheduler: Scheduler = ???
|
||||
implicit val cluster: SelfUniqueAddress = ???
|
||||
val key = GCounterKey("counter")
|
||||
|
||||
val reply1: Future[GetResponse[GCounter]] = replicator.ask(Replicator.Get(key, Replicator.ReadLocal))
|
||||
|
||||
val reply2: Future[UpdateResponse[GCounter]] =
|
||||
replicator.ask(Replicator.Update(key, GCounter.empty, Replicator.WriteLocal)(_ :+ 1))
|
||||
|
||||
val reply3: Future[DeleteResponse[GCounter]] = replicator.ask(Replicator.Delete(key, Replicator.WriteLocal))
|
||||
|
||||
val reply4: Future[ReplicaCount] = replicator.ask(Replicator.GetReplicaCount())
|
||||
|
||||
// suppress unused compiler warnings
|
||||
println("" + reply1 + reply2 + reply3 + reply4)
|
||||
}
|
||||
|
||||
def shouldHaveConvenienceForAsk2(): Unit = {
|
||||
implicit val cluster: SelfUniqueAddress = ???
|
||||
val replicatorAdapter: ReplicatorMessageAdapter[ClientCommand, GCounter] = ???
|
||||
val replyTo: ActorRef[Int] = ???
|
||||
val key = GCounterKey("counter")
|
||||
|
||||
//#curried-update
|
||||
// alternative way to define the `createRequest` function
|
||||
// Replicator.Update instance has a curried `apply` method
|
||||
replicatorAdapter.askUpdate(
|
||||
Replicator.Update(key, GCounter.empty, Replicator.WriteLocal)(_ :+ 1),
|
||||
InternalUpdateResponse.apply)
|
||||
|
||||
// that is the same as
|
||||
replicatorAdapter.askUpdate(
|
||||
askReplyTo => Replicator.Update(key, GCounter.empty, Replicator.WriteLocal, askReplyTo)(_ :+ 1),
|
||||
InternalUpdateResponse.apply)
|
||||
//#curried-update
|
||||
|
||||
//#curried-get
|
||||
// alternative way to define the `createRequest` function
|
||||
// Replicator.Get instance has a curried `apply` method
|
||||
replicatorAdapter.askGet(Replicator.Get(key, Replicator.ReadLocal), value => InternalGetResponse(value, replyTo))
|
||||
|
||||
// that is the same as
|
||||
replicatorAdapter.askGet(
|
||||
askReplyTo => Replicator.Get(key, Replicator.ReadLocal, askReplyTo),
|
||||
value => InternalGetResponse(value, replyTo))
|
||||
//#curried-get
|
||||
}
|
||||
|
||||
def shouldHaveUnapplyForResponseTypes(): Unit = {
|
||||
val getResponse: GetResponse[GCounter] = ???
|
||||
val key = GCounterKey("counter")
|
||||
|
||||
getResponse match {
|
||||
case GetSuccess(`key`) =>
|
||||
case GetFailure(`key`) =>
|
||||
case NotFound(`key`) =>
|
||||
case GetDataDeleted(`key`) =>
|
||||
}
|
||||
|
||||
val updateResponse: UpdateResponse[GCounter] = ???
|
||||
updateResponse match {
|
||||
case UpdateSuccess(`key`) =>
|
||||
case ModifyFailure(`key`, _, _) =>
|
||||
case UpdateTimeout(`key`) =>
|
||||
case StoreFailure(`key`) =>
|
||||
case UpdateFailure(`key`) =>
|
||||
case UpdateDataDeleted(`key`) =>
|
||||
}
|
||||
|
||||
val deleteResponse: DeleteResponse[GCounter] = ???
|
||||
deleteResponse match {
|
||||
case DeleteSuccess(`key`) =>
|
||||
case DeleteFailure(`key`) =>
|
||||
case DataDeleted(`key`) =>
|
||||
}
|
||||
|
||||
val subscribeResponse: SubscribeResponse[GCounter] = ???
|
||||
subscribeResponse match {
|
||||
case Changed(`key`) =>
|
||||
case Deleted(`key`) =>
|
||||
}
|
||||
|
||||
val replicaCount: ReplicaCount = ???
|
||||
replicaCount match {
|
||||
case ReplicaCount(_) =>
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,251 +1,15 @@
|
|||
/*
|
||||
* Copyright (C) 2017-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.cluster.ddata.typed.scaladsl
|
||||
|
||||
// FIXME move to doc package
|
||||
|
||||
import akka.actor.testkit.typed.scaladsl.LogCapturing
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import org.scalatest.WordSpecLike
|
||||
import akka.cluster.ddata.SelfUniqueAddress
|
||||
|
||||
// #sample
|
||||
import akka.actor.typed.Scheduler
|
||||
import akka.actor.typed.{ ActorRef, Behavior }
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.cluster.ddata.typed.scaladsl.Replicator._
|
||||
import akka.cluster.ddata.{ GCounter, GCounterKey }
|
||||
import akka.actor.testkit.typed.scaladsl._
|
||||
import akka.util.Timeout
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
|
||||
// #sample
|
||||
|
||||
object ReplicatorSpec {
|
||||
|
||||
val config = ConfigFactory.parseString("""
|
||||
akka.actor.provider = "cluster"
|
||||
akka.remote.classic.netty.tcp.port = 0
|
||||
akka.remote.artery.canonical.port = 0
|
||||
akka.remote.artery.canonical.hostname = 127.0.0.1
|
||||
""")
|
||||
|
||||
// #sample
|
||||
sealed trait ClientCommand
|
||||
final case object Increment extends ClientCommand
|
||||
final case class GetValue(replyTo: ActorRef[Int]) extends ClientCommand
|
||||
final case class GetCachedValue(replyTo: ActorRef[Int]) extends ClientCommand
|
||||
private sealed trait InternalMsg extends ClientCommand
|
||||
private case class InternalUpdateResponse(rsp: Replicator.UpdateResponse[GCounter]) extends InternalMsg
|
||||
private case class InternalGetResponse(rsp: Replicator.GetResponse[GCounter], replyTo: ActorRef[Int])
|
||||
extends InternalMsg
|
||||
private case class InternalSubscribeResponse(chg: Replicator.SubscribeResponse[GCounter]) extends InternalMsg
|
||||
|
||||
def client(key: GCounterKey): Behavior[ClientCommand] =
|
||||
Behaviors.setup[ClientCommand] { ctx =>
|
||||
implicit val node: SelfUniqueAddress = DistributedData(ctx.system).selfUniqueAddress
|
||||
|
||||
// adapter that turns the response messages from the replicator into our own protocol
|
||||
DistributedData.withReplicatorMessageAdapter[ClientCommand, GCounter] { replicatorAdapter =>
|
||||
replicatorAdapter.subscribe(key, InternalSubscribeResponse.apply)
|
||||
|
||||
def behavior(cachedValue: Int): Behavior[ClientCommand] = {
|
||||
Behaviors.receiveMessage[ClientCommand] {
|
||||
case Increment =>
|
||||
replicatorAdapter.askUpdate(
|
||||
askReplyTo => Replicator.Update(key, GCounter.empty, Replicator.WriteLocal, askReplyTo)(_ :+ 1),
|
||||
InternalUpdateResponse.apply)
|
||||
|
||||
Behaviors.same
|
||||
|
||||
case GetValue(replyTo) =>
|
||||
replicatorAdapter.askGet(
|
||||
askReplyTo => Replicator.Get(key, Replicator.ReadLocal, askReplyTo),
|
||||
value => InternalGetResponse(value, replyTo))
|
||||
|
||||
Behaviors.same
|
||||
|
||||
case GetCachedValue(replyTo) =>
|
||||
replyTo ! cachedValue
|
||||
Behaviors.same
|
||||
|
||||
case internal: InternalMsg =>
|
||||
internal match {
|
||||
case InternalUpdateResponse(_) => Behaviors.same // ok
|
||||
|
||||
case InternalGetResponse(rsp @ Replicator.GetSuccess(`key`), replyTo) =>
|
||||
val value = rsp.get(key).value.toInt
|
||||
replyTo ! value
|
||||
Behaviors.same
|
||||
|
||||
case InternalGetResponse(_, _) =>
|
||||
Behaviors.unhandled // not dealing with failures
|
||||
|
||||
case InternalSubscribeResponse(chg @ Replicator.Changed(`key`)) =>
|
||||
val value = chg.get(key).value.intValue
|
||||
behavior(value)
|
||||
|
||||
case InternalSubscribeResponse(Replicator.Deleted(_)) =>
|
||||
Behaviors.unhandled // no deletes
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
behavior(cachedValue = 0)
|
||||
}
|
||||
}
|
||||
// #sample
|
||||
|
||||
object CompileOnlyTest {
|
||||
def shouldHaveConvenienceForAsk(): Unit = {
|
||||
import akka.actor.typed.scaladsl.AskPattern._
|
||||
|
||||
val replicator: ActorRef[Replicator.Command] = ???
|
||||
implicit val timeout = Timeout(3.seconds)
|
||||
implicit val scheduler: Scheduler = ???
|
||||
implicit val cluster: SelfUniqueAddress = ???
|
||||
val key = GCounterKey("counter")
|
||||
|
||||
val reply1: Future[GetResponse[GCounter]] = replicator.ask(Replicator.Get(key, Replicator.ReadLocal))
|
||||
|
||||
val reply2: Future[UpdateResponse[GCounter]] =
|
||||
replicator.ask(Replicator.Update(key, GCounter.empty, Replicator.WriteLocal)(_ :+ 1))
|
||||
|
||||
val reply3: Future[DeleteResponse[GCounter]] = replicator.ask(Replicator.Delete(key, Replicator.WriteLocal))
|
||||
|
||||
val reply4: Future[ReplicaCount] = replicator.ask(Replicator.GetReplicaCount())
|
||||
|
||||
// suppress unused compiler warnings
|
||||
println("" + reply1 + reply2 + reply3 + reply4)
|
||||
}
|
||||
|
||||
def shouldHaveConvenienceForAsk2(): Unit = {
|
||||
implicit val cluster: SelfUniqueAddress = ???
|
||||
val replicatorAdapter: ReplicatorMessageAdapter[ClientCommand, GCounter] = ???
|
||||
val replyTo: ActorRef[Int] = ???
|
||||
val key = GCounterKey("counter")
|
||||
|
||||
//#curried-update
|
||||
// alternative way to define the `createRequest` function
|
||||
// Replicator.Update instance has a curried `apply` method
|
||||
replicatorAdapter.askUpdate(
|
||||
Replicator.Update(key, GCounter.empty, Replicator.WriteLocal)(_ :+ 1),
|
||||
InternalUpdateResponse.apply)
|
||||
|
||||
// that is the same as
|
||||
replicatorAdapter.askUpdate(
|
||||
askReplyTo => Replicator.Update(key, GCounter.empty, Replicator.WriteLocal, askReplyTo)(_ :+ 1),
|
||||
InternalUpdateResponse.apply)
|
||||
//#curried-update
|
||||
|
||||
//#curried-get
|
||||
// alternative way to define the `createRequest` function
|
||||
// Replicator.Get instance has a curried `apply` method
|
||||
replicatorAdapter.askGet(Replicator.Get(key, Replicator.ReadLocal), value => InternalGetResponse(value, replyTo))
|
||||
|
||||
// that is the same as
|
||||
replicatorAdapter.askGet(
|
||||
askReplyTo => Replicator.Get(key, Replicator.ReadLocal, askReplyTo),
|
||||
value => InternalGetResponse(value, replyTo))
|
||||
//#curried-get
|
||||
}
|
||||
|
||||
def shouldHaveUnapplyForResponseTypes(): Unit = {
|
||||
val getResponse: GetResponse[GCounter] = ???
|
||||
val key = GCounterKey("counter")
|
||||
|
||||
getResponse match {
|
||||
case GetSuccess(`key`) =>
|
||||
case GetFailure(`key`) =>
|
||||
case NotFound(`key`) =>
|
||||
case GetDataDeleted(`key`) =>
|
||||
}
|
||||
|
||||
val updateResponse: UpdateResponse[GCounter] = ???
|
||||
updateResponse match {
|
||||
case UpdateSuccess(`key`) =>
|
||||
case ModifyFailure(`key`, _, _) =>
|
||||
case UpdateTimeout(`key`) =>
|
||||
case StoreFailure(`key`) =>
|
||||
case UpdateFailure(`key`) =>
|
||||
case UpdateDataDeleted(`key`) =>
|
||||
}
|
||||
|
||||
val deleteResponse: DeleteResponse[GCounter] = ???
|
||||
deleteResponse match {
|
||||
case DeleteSuccess(`key`) =>
|
||||
case DeleteFailure(`key`) =>
|
||||
case DataDeleted(`key`) =>
|
||||
}
|
||||
|
||||
val subscribeResponse: SubscribeResponse[GCounter] = ???
|
||||
subscribeResponse match {
|
||||
case Changed(`key`) =>
|
||||
case Deleted(`key`) =>
|
||||
}
|
||||
|
||||
val replicaCount: ReplicaCount = ???
|
||||
replicaCount match {
|
||||
case ReplicaCount(_) =>
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class ReplicatorSpec extends ScalaTestWithActorTestKit(ReplicatorSpec.config) with WordSpecLike with LogCapturing {
|
||||
|
||||
import ReplicatorSpec._
|
||||
|
||||
implicit val selfNodeAddress = DistributedData(system).selfUniqueAddress
|
||||
|
||||
class ReplicatorSpec extends ScalaTestWithActorTestKit with WordSpecLike with LogCapturing {
|
||||
"Replicator" must {
|
||||
|
||||
"have API for Update and Get" in {
|
||||
val c = spawn(client(GCounterKey("counter1")))
|
||||
|
||||
val probe = createTestProbe[Int]()
|
||||
c ! Increment
|
||||
c ! GetValue(probe.ref)
|
||||
probe.expectMessage(1)
|
||||
}
|
||||
|
||||
"have API for Subscribe" in {
|
||||
val c = spawn(client(GCounterKey("counter2")))
|
||||
|
||||
val probe = createTestProbe[Int]()
|
||||
c ! Increment
|
||||
c ! Increment
|
||||
eventually {
|
||||
c ! GetCachedValue(probe.ref)
|
||||
probe.expectMessage(2)
|
||||
}
|
||||
c ! Increment
|
||||
eventually {
|
||||
c ! GetCachedValue(probe.ref)
|
||||
probe.expectMessage(3)
|
||||
}
|
||||
}
|
||||
|
||||
"have an extension" in {
|
||||
val key = GCounterKey("counter3")
|
||||
val c = spawn(client(key))
|
||||
|
||||
val probe = createTestProbe[Int]()
|
||||
c ! Increment
|
||||
c ! GetValue(probe.ref)
|
||||
probe.expectMessage(1)
|
||||
|
||||
val getReplyProbe = createTestProbe[Replicator.GetResponse[GCounter]]()
|
||||
val replicator = DistributedData(system).replicator
|
||||
replicator ! Replicator.Get(key, Replicator.ReadLocal, getReplyProbe.ref)
|
||||
val rsp = getReplyProbe.expectMessageType[GetSuccess[GCounter]]
|
||||
rsp.get(key).value.toInt should ===(1)
|
||||
}
|
||||
|
||||
"have the prefixed replicator name" in {
|
||||
ReplicatorSettings.name(system) should ===("typedDdataReplicator")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,158 @@
|
|||
/*
|
||||
* Copyright (C) 2017-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package docs.akka.cluster.ddata.typed.scaladsl
|
||||
|
||||
import akka.cluster.ddata.SelfUniqueAddress
|
||||
import akka.cluster.ddata.typed.scaladsl.DistributedData
|
||||
import akka.cluster.ddata.typed.scaladsl.Replicator
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
import akka.actor.testkit.typed.scaladsl._
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
// #sample
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.cluster.ddata.GCounter
|
||||
import akka.cluster.ddata.GCounterKey
|
||||
import akka.cluster.ddata.typed.scaladsl.Replicator._
|
||||
|
||||
// #sample
|
||||
|
||||
object ReplicatorDocSpec {
|
||||
|
||||
val config = ConfigFactory.parseString("""
|
||||
akka.actor.provider = cluster
|
||||
akka.remote.classic.netty.tcp.port = 0
|
||||
akka.remote.artery.canonical.port = 0
|
||||
akka.remote.artery.canonical.hostname = 127.0.0.1
|
||||
""")
|
||||
|
||||
// #sample
|
||||
object Counter {
|
||||
sealed trait Command
|
||||
final case object Increment extends Command
|
||||
final case class GetValue(replyTo: ActorRef[Int]) extends Command
|
||||
final case class GetCachedValue(replyTo: ActorRef[Int]) extends Command
|
||||
private sealed trait InternalCommand extends Command
|
||||
private case class InternalUpdateResponse(rsp: Replicator.UpdateResponse[GCounter]) extends InternalCommand
|
||||
private case class InternalGetResponse(rsp: Replicator.GetResponse[GCounter], replyTo: ActorRef[Int])
|
||||
extends InternalCommand
|
||||
private case class InternalSubscribeResponse(chg: Replicator.SubscribeResponse[GCounter]) extends InternalCommand
|
||||
|
||||
def apply(key: GCounterKey): Behavior[Command] =
|
||||
Behaviors.setup[Command] { ctx =>
|
||||
implicit val node: SelfUniqueAddress = DistributedData(ctx.system).selfUniqueAddress
|
||||
|
||||
// adapter that turns the response messages from the replicator into our own protocol
|
||||
DistributedData.withReplicatorMessageAdapter[Command, GCounter] { replicatorAdapter =>
|
||||
replicatorAdapter.subscribe(key, InternalSubscribeResponse.apply)
|
||||
|
||||
def updated(cachedValue: Int): Behavior[Command] = {
|
||||
Behaviors.receiveMessage[Command] {
|
||||
case Increment =>
|
||||
replicatorAdapter.askUpdate(
|
||||
askReplyTo => Replicator.Update(key, GCounter.empty, Replicator.WriteLocal, askReplyTo)(_ :+ 1),
|
||||
InternalUpdateResponse.apply)
|
||||
|
||||
Behaviors.same
|
||||
|
||||
case GetValue(replyTo) =>
|
||||
replicatorAdapter.askGet(
|
||||
askReplyTo => Replicator.Get(key, Replicator.ReadLocal, askReplyTo),
|
||||
value => InternalGetResponse(value, replyTo))
|
||||
|
||||
Behaviors.same
|
||||
|
||||
case GetCachedValue(replyTo) =>
|
||||
replyTo ! cachedValue
|
||||
Behaviors.same
|
||||
|
||||
case internal: InternalCommand =>
|
||||
internal match {
|
||||
case InternalUpdateResponse(_) => Behaviors.same // ok
|
||||
|
||||
case InternalGetResponse(rsp @ Replicator.GetSuccess(`key`), replyTo) =>
|
||||
val value = rsp.get(key).value.toInt
|
||||
replyTo ! value
|
||||
Behaviors.same
|
||||
|
||||
case InternalGetResponse(_, _) =>
|
||||
Behaviors.unhandled // not dealing with failures
|
||||
|
||||
case InternalSubscribeResponse(chg @ Replicator.Changed(`key`)) =>
|
||||
val value = chg.get(key).value.intValue
|
||||
updated(value)
|
||||
|
||||
case InternalSubscribeResponse(Replicator.Deleted(_)) =>
|
||||
Behaviors.unhandled // no deletes
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
updated(cachedValue = 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
// #sample
|
||||
|
||||
}
|
||||
|
||||
class ReplicatorDocSpec
|
||||
extends ScalaTestWithActorTestKit(ReplicatorDocSpec.config)
|
||||
with WordSpecLike
|
||||
with LogCapturing {
|
||||
|
||||
import ReplicatorDocSpec._
|
||||
|
||||
implicit val selfNodeAddress = DistributedData(system).selfUniqueAddress
|
||||
|
||||
"Replicator" must {
|
||||
|
||||
"have API for Update and Get" in {
|
||||
val c = spawn(Counter(GCounterKey("counter1")))
|
||||
|
||||
val probe = createTestProbe[Int]()
|
||||
c ! Counter.Increment
|
||||
c ! Counter.GetValue(probe.ref)
|
||||
probe.expectMessage(1)
|
||||
}
|
||||
|
||||
"have API for Subscribe" in {
|
||||
val c = spawn(Counter(GCounterKey("counter2")))
|
||||
|
||||
val probe = createTestProbe[Int]()
|
||||
c ! Counter.Increment
|
||||
c ! Counter.Increment
|
||||
eventually {
|
||||
c ! Counter.GetCachedValue(probe.ref)
|
||||
probe.expectMessage(2)
|
||||
}
|
||||
c ! Counter.Increment
|
||||
eventually {
|
||||
c ! Counter.GetCachedValue(probe.ref)
|
||||
probe.expectMessage(3)
|
||||
}
|
||||
}
|
||||
|
||||
"have an extension" in {
|
||||
val key = GCounterKey("counter3")
|
||||
val c = spawn(Counter(key))
|
||||
|
||||
val probe = createTestProbe[Int]()
|
||||
c ! Counter.Increment
|
||||
c ! Counter.GetValue(probe.ref)
|
||||
probe.expectMessage(1)
|
||||
|
||||
val getReplyProbe = createTestProbe[Replicator.GetResponse[GCounter]]()
|
||||
val replicator = DistributedData(system).replicator
|
||||
replicator ! Replicator.Get(key, Replicator.ReadLocal, getReplyProbe.ref)
|
||||
val rsp = getReplyProbe.expectMessageType[GetSuccess[GCounter]]
|
||||
rsp.get(key).value.toInt should ===(1)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -50,10 +50,10 @@ This sample uses the replicated data type `GCounter` to implement a counter that
|
|||
cluster:
|
||||
|
||||
Scala
|
||||
: @@snip [ReplicatorSpec.scala](/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala) { #sample }
|
||||
: @@snip [ReplicatorSpec.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/ddata/typed/scaladsl/ReplicatorDocSpec.scala) { #sample }
|
||||
|
||||
Java
|
||||
: @@snip [ReplicatorTest.java](/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java) { #sample }
|
||||
: @@snip [ReplicatorTest.java](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/ddata/typed/javadsl/ReplicatorDocSample.java) { #sample }
|
||||
|
||||
Although you can interact with the `Replicator` using the @scala[`ActorRef[Replicator.Command]`]@java[`ActorRef<Replicator.Command>`]
|
||||
from @scala[`DistributedData(ctx.system).replicator`]@java[`DistributedData(ctx.getSystem()).replicator()`] it's
|
||||
|
|
@ -86,12 +86,12 @@ for more details about `Get`, `Update` and `Delete` interactions with the replic
|
|||
There is alternative way of constructing the function for the `Update` message:
|
||||
|
||||
Scala
|
||||
: @@snip [ReplicatorSpec.scala](/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala) { #curried-update }
|
||||
: @@snip [ReplicatorSpec.scala](/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorCompileOnlyTest.scala) { #curried-update }
|
||||
|
||||
Similar is supported for `Get` and `Delete`:
|
||||
|
||||
Scala
|
||||
: @@snip [ReplicatorSpec.scala](/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala) { #curried-get }
|
||||
: @@snip [ReplicatorSpec.scala](/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorCompileOnlyTest.scala) { #curried-get }
|
||||
|
||||
@@@
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue