Docs for typed distributed data (#25747)

* Typed distributed data docs shaping up
This commit is contained in:
Johan Andrén 2018-10-12 09:14:01 +02:00 committed by Christopher Batey
parent 0c608321b9
commit b0de255d27
3 changed files with 203 additions and 71 deletions

View file

@ -4,16 +4,15 @@
package akka.cluster.ddata.typed.javadsl;
import akka.cluster.ddata.typed.javadsl.Replicator;
import akka.cluster.ddata.typed.javadsl.ReplicatorSettings;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.Optional;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import akka.actor.ActorSystem;
// #sample
import java.util.Optional;
import akka.actor.typed.ActorSystem;
import akka.cluster.Cluster;
import akka.cluster.ddata.GCounter;
import akka.cluster.ddata.GCounterKey;
@ -29,12 +28,15 @@ import akka.actor.typed.javadsl.Adapter;
import akka.actor.typed.javadsl.MutableBehavior;
import akka.actor.typed.javadsl.ActorContext;
// #sample
public class ReplicatorTest extends JUnitSuite {
interface ClientCommand {
}
// #sample
interface ClientCommand { }
static final class Increment implements ClientCommand {
enum Increment implements ClientCommand {
INSTANCE
}
static final class GetValue implements ClientCommand {
@ -53,36 +55,35 @@ public class ReplicatorTest extends JUnitSuite {
}
}
interface InternalMsg extends ClientCommand {
}
private interface InternalMsg extends ClientCommand { }
static final class InternalUpdateResponse<A extends ReplicatedData> implements InternalMsg {
final Replicator.UpdateResponse<A> rsp;
private static final class InternalUpdateResponse implements InternalMsg {
final Replicator.UpdateResponse<GCounter> rsp;
InternalUpdateResponse(Replicator.UpdateResponse<A> rsp) {
InternalUpdateResponse(Replicator.UpdateResponse<GCounter> rsp) {
this.rsp = rsp;
}
}
static final class InternalGetResponse<A extends ReplicatedData> implements InternalMsg {
final Replicator.GetResponse<A> rsp;
private static final class InternalGetResponse implements InternalMsg {
final Replicator.GetResponse<GCounter> rsp;
InternalGetResponse(Replicator.GetResponse<A> rsp) {
InternalGetResponse(Replicator.GetResponse<GCounter> rsp) {
this.rsp = rsp;
}
}
static final class InternalChanged<A extends ReplicatedData> implements InternalMsg {
final Replicator.Changed<A> chg;
private static final class InternalChanged implements InternalMsg {
final Replicator.Changed<GCounter> chg;
InternalChanged(Replicator.Changed<A> chg) {
InternalChanged(Replicator.Changed<GCounter> chg) {
this.chg = chg;
}
}
static final Key<GCounter> Key = GCounterKey.create("counter");
static class Client extends MutableBehavior<ClientCommand> {
static class Counter extends MutableBehavior<ClientCommand> {
private final ActorRef<Replicator.Command> replicator;
private final Cluster node;
final ActorRef<Replicator.UpdateResponse<GCounter>> updateResponseAdapter;
@ -91,10 +92,12 @@ public class ReplicatorTest extends JUnitSuite {
private int cachedValue = 0;
Client(ActorRef<Command> replicator, Cluster node, ActorContext<ClientCommand> ctx) {
Counter(ActorRef<Command> replicator, Cluster node, ActorContext<ClientCommand> ctx) {
this.replicator = replicator;
this.node = node;
// adapters turning the messages from the replicator
// into our own protocol
updateResponseAdapter = ctx.messageAdapter(
(Class<Replicator.UpdateResponse<GCounter>>) (Object) Replicator.UpdateResponse.class,
msg -> new InternalUpdateResponse(msg));
@ -110,50 +113,80 @@ public class ReplicatorTest extends JUnitSuite {
replicator.tell(new Replicator.Subscribe<>(Key, changedAdapter));
}
public static Behavior<ClientCommand> create(ActorRef<Command> replicator, Cluster node) {
return Behaviors.setup(ctx -> new Client(replicator, node, ctx));
public static Behavior<ClientCommand> create() {
return Behaviors.setup((ctx) -> {
// The distributed data types still need the implicit untyped Cluster.
// We will look into another solution for that.
Cluster node = Cluster.get(Adapter.toUntyped(ctx.getSystem()));
ActorRef<Replicator.Command> replicator = DistributedData.get(ctx.getSystem()).replicator();
return new Counter(replicator, node, ctx);
});
}
// #sample
// omitted from sample, needed for tests, factory above is for the docs sample
public static Behavior<ClientCommand> create(ActorRef<Command> replicator, Cluster node) {
return Behaviors.setup(ctx -> new Counter(replicator, node, ctx));
}
// #sample
@Override
public Behaviors.Receive<ClientCommand> createReceive() {
return receiveBuilder()
.onMessage(Increment.class, cmd -> {
replicator.tell(
new Replicator.Update<>(Key, GCounter.empty(), Replicator.writeLocal(), updateResponseAdapter,
curr -> curr.increment(node, 1)));
return this;
})
.onMessage(InternalUpdateResponse.class, msg -> {
return this;
})
.onMessage(GetValue.class, cmd -> {
replicator.tell(
new Replicator.Get<>(Key, Replicator.readLocal(), getResponseAdapter, Optional.of(cmd.replyTo)));
return this;
})
.onMessage(GetCachedValue.class, cmd -> {
cmd.replyTo.tell(cachedValue);
return this;
})
.onMessage(InternalGetResponse.class, msg -> {
if (msg.rsp instanceof Replicator.GetSuccess) {
int value = ((Replicator.GetSuccess<?>) msg.rsp).get(Key).getValue().intValue();
ActorRef<Integer> replyTo = (ActorRef<Integer>) msg.rsp.request().get();
replyTo.tell(value);
} else {
// not dealing with failures
}
return this;
})
.onMessage(InternalChanged.class, msg -> {
GCounter counter = (GCounter) msg.chg.get(Key);
cachedValue = counter.getValue().intValue();
return this;
})
.onMessage(Increment.class, this::onIncrement)
.onMessage(InternalUpdateResponse.class, msg -> Behaviors.same())
.onMessage(GetValue.class, this::onGetValue)
.onMessage(GetCachedValue.class, this::onGetCachedValue)
.onMessage(InternalGetResponse.class, this::onInternalGetResponse)
.onMessage(InternalChanged.class, this::onInternalChanged)
.build();
}
private Behavior<ClientCommand> onIncrement(Increment cmd) {
replicator.tell(
new Replicator.Update<>(
Key,
GCounter.empty(),
Replicator.writeLocal(),
updateResponseAdapter,
curr -> curr.increment(node, 1)));
return Behaviors.same();
}
private Behavior<ClientCommand> onGetValue(GetValue cmd) {
replicator.tell(
new Replicator.Get<>(Key, Replicator.readLocal(), getResponseAdapter, Optional.of(cmd.replyTo)));
return Behaviors.same();
}
private Behavior<ClientCommand> onGetCachedValue(GetCachedValue cmd) {
cmd.replyTo.tell(cachedValue);
return Behaviors.same();
}
private Behavior<ClientCommand> onInternalGetResponse(InternalGetResponse msg) {
if (msg.rsp instanceof Replicator.GetSuccess) {
int value = ((Replicator.GetSuccess<?>) msg.rsp).get(Key).getValue().intValue();
ActorRef<Integer> replyTo = (ActorRef<Integer>) msg.rsp.request().get();
replyTo.tell(value);
return Behaviors.same();
} else {
// not dealing with failures
return Behaviors.unhandled();
}
}
private Behavior<ClientCommand> onInternalChanged(InternalChanged msg) {
GCounter counter = msg.chg.get(Key);
cachedValue = counter.getValue().intValue();
return this;
}
}
// #sample
static Config config = ConfigFactory.parseString(
"akka.actor.provider = cluster \n" +
@ -165,14 +198,13 @@ public class ReplicatorTest extends JUnitSuite {
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("ReplicatorTest",
config);
private final ActorSystem system = actorSystemResource.getSystem();
private final akka.actor.ActorSystem system = actorSystemResource.getSystem();
akka.actor.typed.ActorSystem<?> typedSystem() {
ActorSystem<?> typedSystem() {
return Adapter.toTyped(system);
}
@Test
public void shouldHaveApiForUpdateAndGet() {
TestKit probe = new TestKit(system);
@ -180,9 +212,9 @@ public class ReplicatorTest extends JUnitSuite {
ActorRef<Replicator.Command> replicator =
Adapter.spawnAnonymous(system, Replicator.behavior(settings));
ActorRef<ClientCommand> client =
Adapter.spawnAnonymous(system, Client.create(replicator, Cluster.get(system)));
Adapter.spawnAnonymous(system, Counter.create(replicator, Cluster.get(system)));
client.tell(new Increment());
client.tell(Increment.INSTANCE);
client.tell(new GetValue(Adapter.toTyped(probe.getRef())));
probe.expectMsg(1);
}
@ -194,17 +226,17 @@ public class ReplicatorTest extends JUnitSuite {
ActorRef<Replicator.Command> replicator =
Adapter.spawnAnonymous(system, Replicator.behavior(settings));
ActorRef<ClientCommand> client =
Adapter.spawnAnonymous(system, Client.create(replicator, Cluster.get(system)));
Adapter.spawnAnonymous(system, Counter.create(replicator, Cluster.get(system)));
client.tell(new Increment());
client.tell(new Increment());
client.tell(Increment.INSTANCE);
client.tell(Increment.INSTANCE);
probe.awaitAssert(() -> {
client.tell(new GetCachedValue(Adapter.toTyped(probe.getRef())));
probe.expectMsg(2);
return null;
});
client.tell(new Increment());
client.tell(Increment.INSTANCE);
probe.awaitAssert(() -> {
client.tell(new GetCachedValue(Adapter.toTyped(probe.getRef())));
probe.expectMsg(3);

View file

@ -4,6 +4,10 @@
package akka.cluster.ddata.typed.scaladsl
import org.scalatest.WordSpecLike
import akka.actor.testkit.typed.TestKitSettings
// #sample
import akka.actor.Scheduler
import akka.actor.typed.{ ActorRef, Behavior }
import akka.actor.typed.scaladsl.AskPattern._
@ -13,14 +17,14 @@ import akka.cluster.Cluster
import akka.cluster.ddata.typed.scaladsl.Replicator._
import akka.cluster.ddata.{ GCounter, GCounterKey, ReplicatedData }
import akka.actor.testkit.typed.scaladsl._
import akka.actor.testkit.typed.TestKitSettings
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
import scala.concurrent.Future
import scala.concurrent.duration._
// #sample
object ReplicatorSpec {
val config = ConfigFactory.parseString(
@ -31,20 +35,24 @@ object ReplicatorSpec {
akka.remote.artery.canonical.hostname = 127.0.0.1
""")
// #sample
sealed trait ClientCommand
final case object Increment extends ClientCommand
final case class GetValue(replyTo: ActorRef[Int]) extends ClientCommand
final case class GetCachedValue(replyTo: ActorRef[Int]) extends ClientCommand
private sealed trait InternalMsg extends ClientCommand
private case class InternalUpdateResponse[A <: ReplicatedData](rsp: Replicator.UpdateResponse[A]) extends InternalMsg
private case class InternalGetResponse[A <: ReplicatedData](rsp: Replicator.GetResponse[A]) extends InternalMsg
private case class InternalChanged[A <: ReplicatedData](chg: Replicator.Changed[A]) extends InternalMsg
private case class InternalUpdateResponse(rsp: Replicator.UpdateResponse[GCounter]) extends InternalMsg
private case class InternalGetResponse(rsp: Replicator.GetResponse[GCounter]) extends InternalMsg
private case class InternalChanged(chg: Replicator.Changed[GCounter]) extends InternalMsg
val Key = GCounterKey("counter")
def client(replicator: ActorRef[Replicator.Command])(implicit cluster: Cluster): Behavior[ClientCommand] =
Behaviors.setup[ClientCommand] { ctx
// The distributed data types still need the implicit untyped Cluster.
// We will look into another solution for that.
val updateResponseAdapter: ActorRef[Replicator.UpdateResponse[GCounter]] =
ctx.messageAdapter(InternalUpdateResponse.apply)
@ -92,6 +100,7 @@ object ReplicatorSpec {
behavior(cachedValue = 0)
}
// #sample
object CompileOnlyTest {
def shouldHaveConvenienceForAsk(): Unit = {

View file

@ -1,5 +1,96 @@
# Distributed Data
TODO https://github.com/akka/akka/issues/24494
## Dependency
To use Akka Cluster Distributed Data Typed, you must add the following dependency in your project:
@@dependency[sbt,Maven,Gradle] {
group=com.typesafe.akka
artifact=akka-cluster-typed_$scala.binary_version$
version=$akka.version$
}
## Introduction
*Akka Distributed Data* is useful when you need to share data between nodes in an
Akka Cluster. The data is accessed with an actor providing a key-value store like API.
The keys are unique identifiers with type information of the data values. The values
are *Conflict Free Replicated Data Types* (CRDTs).
All data entries are spread to all nodes, or nodes with a certain role, in the cluster
via direct replication and gossip based dissemination. You have fine grained control
of the consistency level for reads and writes.
The nature CRDTs makes it possible to perform updates from any node without coordination.
Concurrent updates from different nodes will automatically be resolved by the monotonic
merge function, which all data types must provide. The state changes always converge.
Several useful data types for counters, sets, maps and registers are provided and
you can also implement your own custom data types.
It is eventually consistent and geared toward providing high read and write availability
(partition tolerance), with low latency. Note that in an eventually consistent system a read may return an
out-of-date value.
## Using the Replicator
The @scala[@unidoc[akka.cluster.ddata.typed.scaladsl.Replicator]]@java[@unidoc[akka.cluster.ddata.typed.javadsl.Replicator]]
actor provides the API for interacting with the data and is accessed through the extension
@scala[@unidoc[akka.cluster.ddata.typed.scaladsl.DistributedData]]@java[@unidoc[akka.cluster.ddata.typed.javadsl.DistributedData]].
The messages for the replicator, such as `Replicator.Update` are defined in @scala[`akka.cluster.ddata.typed.scaladsl.Replicator`]
@java[`akka.cluster.ddata.typed.scaladsl.Replicator`] but the actual CRDTs are the
same as in untyped, for example `akka.cluster.ddata.GCounter`. This will require an @scala[implicit] untyped `Cluster`
for now, we hope to improve this in the future ([issue #25746](https://github.com/akka/akka/issues/25746)).
The replicator can contain multiple entries each containing a replicated data type, we therefore need to create a
key identifying the entry and helping us know what type it has, and then use that key for every interaction with
the replicator. Each replicated data type contains a factory for defining such a key.
This sample uses the replicated data type `GCounter` to implement a counter that can be written to on any node of the
cluster:
Scala
: @@snip [ReplicatorSpec.scala](/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala) { #sample }
Java
: @@snip [ReplicatorTest.java](/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java) { #sample }
When we start up the actor we subscribe it to changes for our key, this means that whenever the replicator see a change
for the counter our actor will get a @scala[`Replicator.Changed[GCounter]`]@java[`Replicator.Changed<GCounter>`], since
this is not a message in our protocol, we use an adapter to wrap it in the internal `InternalChanged` message, which
is then handled in the regular message handling of the behavior.
For an incoming `Increment` command, we send the `replicator` a `Replicator.Update` request, it contains five values:
1. the @scala[`Key`]@java[`KEY`] we want to update
1. the data to use if as the empty state if the replicator has not seen the key before
1. the consistency level we want for the update
1. an @scala[`ActorRef[Replicator.UpdateResponse[GCounter]]`]@java[`ActorRef<Replicator.UpdateResponse<GCounter>>`]
to respond to when the update is completed
1. a function that takes a previous state and updates it, in our case by incrementing it with 1
Whenever the distributed counter is updated, we cache the value so that we can answer requests about the value without
the extra interaction with the replicator using the `GetCachedValue` command.
We also support asking the replicator, using the `GetValue`, demonstrating how many of the replicator commands take
a pass-along value that will be put in the response message so that we do not need to keep a local state tracking
what actors are waiting for responses, but can extract the `replyTo` actor from the replicator when it responds
with a `GetSuccess`. See the @ref[the untyped Distributed Data documentation](../distributed-data.md#using-the-replicator)
for more details about what interactions with the replicator there are.
### Replicated data types
Akka contains a set of useful replicated data types and it is fully possible to implement custom replicated data types.
For more details, read @ref[the untyped Distributed Data documentation](../distributed-data.md#data-types)
### Running separate instances of the replicator
For some use cases, for example when limiting the replicator to certain roles, or using different subsets on different roles,
it makes sense to start separate replicators, this needs to be done on all nodes, or
the group of nodes tagged with a specific role. To do this with the Typed Distributed Data you will first
have to start an untyped `Replicator` and pass it to the `Replicator.behavior` method that takes an untyped
actor ref. All such `Replicator`s must run on the same path in the untyped actor hierarchy.
See [https://akka.io/blog/2017/10/04/typed-cluster-tools](https://akka.io/blog/2017/10/04/typed-cluster-tools)