diff --git a/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java b/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java index ab59c5119e..e44619f745 100644 --- a/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java +++ b/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java @@ -4,16 +4,15 @@ package akka.cluster.ddata.typed.javadsl; -import akka.cluster.ddata.typed.javadsl.Replicator; -import akka.cluster.ddata.typed.javadsl.ReplicatorSettings; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import java.util.Optional; import org.junit.ClassRule; import org.junit.Test; import org.scalatest.junit.JUnitSuite; -import akka.actor.ActorSystem; +// #sample +import java.util.Optional; +import akka.actor.typed.ActorSystem; import akka.cluster.Cluster; import akka.cluster.ddata.GCounter; import akka.cluster.ddata.GCounterKey; @@ -29,12 +28,15 @@ import akka.actor.typed.javadsl.Adapter; import akka.actor.typed.javadsl.MutableBehavior; import akka.actor.typed.javadsl.ActorContext; +// #sample + public class ReplicatorTest extends JUnitSuite { - interface ClientCommand { - } + // #sample + interface ClientCommand { } - static final class Increment implements ClientCommand { + enum Increment implements ClientCommand { + INSTANCE } static final class GetValue implements ClientCommand { @@ -53,36 +55,35 @@ public class ReplicatorTest extends JUnitSuite { } } - interface InternalMsg extends ClientCommand { - } + private interface InternalMsg extends ClientCommand { } - static final class InternalUpdateResponse implements InternalMsg { - final Replicator.UpdateResponse rsp; + private static final class InternalUpdateResponse implements InternalMsg { + final Replicator.UpdateResponse rsp; - InternalUpdateResponse(Replicator.UpdateResponse rsp) { + InternalUpdateResponse(Replicator.UpdateResponse rsp) { this.rsp = rsp; } } - static final class InternalGetResponse implements InternalMsg { - final Replicator.GetResponse rsp; + private static final class InternalGetResponse implements InternalMsg { + final Replicator.GetResponse rsp; - InternalGetResponse(Replicator.GetResponse rsp) { + InternalGetResponse(Replicator.GetResponse rsp) { this.rsp = rsp; } } - static final class InternalChanged implements InternalMsg { - final Replicator.Changed chg; + private static final class InternalChanged implements InternalMsg { + final Replicator.Changed chg; - InternalChanged(Replicator.Changed chg) { + InternalChanged(Replicator.Changed chg) { this.chg = chg; } } static final Key Key = GCounterKey.create("counter"); - static class Client extends MutableBehavior { + static class Counter extends MutableBehavior { private final ActorRef replicator; private final Cluster node; final ActorRef> updateResponseAdapter; @@ -91,10 +92,12 @@ public class ReplicatorTest extends JUnitSuite { private int cachedValue = 0; - Client(ActorRef replicator, Cluster node, ActorContext ctx) { + Counter(ActorRef replicator, Cluster node, ActorContext ctx) { this.replicator = replicator; this.node = node; + // adapters turning the messages from the replicator + // into our own protocol updateResponseAdapter = ctx.messageAdapter( (Class>) (Object) Replicator.UpdateResponse.class, msg -> new InternalUpdateResponse(msg)); @@ -110,50 +113,80 @@ public class ReplicatorTest extends JUnitSuite { replicator.tell(new Replicator.Subscribe<>(Key, changedAdapter)); } - public static Behavior create(ActorRef replicator, Cluster node) { - return Behaviors.setup(ctx -> new Client(replicator, node, ctx)); + public static Behavior create() { + return Behaviors.setup((ctx) -> { + // The distributed data types still need the implicit untyped Cluster. + // We will look into another solution for that. + Cluster node = Cluster.get(Adapter.toUntyped(ctx.getSystem())); + ActorRef replicator = DistributedData.get(ctx.getSystem()).replicator(); + + return new Counter(replicator, node, ctx); + }); } + // #sample + // omitted from sample, needed for tests, factory above is for the docs sample + public static Behavior create(ActorRef replicator, Cluster node) { + return Behaviors.setup(ctx -> new Counter(replicator, node, ctx)); + } + // #sample + @Override public Behaviors.Receive createReceive() { return receiveBuilder() - .onMessage(Increment.class, cmd -> { - replicator.tell( - new Replicator.Update<>(Key, GCounter.empty(), Replicator.writeLocal(), updateResponseAdapter, - curr -> curr.increment(node, 1))); - return this; - }) - .onMessage(InternalUpdateResponse.class, msg -> { - return this; - }) - .onMessage(GetValue.class, cmd -> { - replicator.tell( - new Replicator.Get<>(Key, Replicator.readLocal(), getResponseAdapter, Optional.of(cmd.replyTo))); - return this; - }) - .onMessage(GetCachedValue.class, cmd -> { - cmd.replyTo.tell(cachedValue); - return this; - }) - .onMessage(InternalGetResponse.class, msg -> { - if (msg.rsp instanceof Replicator.GetSuccess) { - int value = ((Replicator.GetSuccess) msg.rsp).get(Key).getValue().intValue(); - ActorRef replyTo = (ActorRef) msg.rsp.request().get(); - replyTo.tell(value); - } else { - // not dealing with failures - } - return this; - }) - .onMessage(InternalChanged.class, msg -> { - GCounter counter = (GCounter) msg.chg.get(Key); - cachedValue = counter.getValue().intValue(); - return this; - }) + .onMessage(Increment.class, this::onIncrement) + .onMessage(InternalUpdateResponse.class, msg -> Behaviors.same()) + .onMessage(GetValue.class, this::onGetValue) + .onMessage(GetCachedValue.class, this::onGetCachedValue) + .onMessage(InternalGetResponse.class, this::onInternalGetResponse) + .onMessage(InternalChanged.class, this::onInternalChanged) .build(); } + + private Behavior onIncrement(Increment cmd) { + replicator.tell( + new Replicator.Update<>( + Key, + GCounter.empty(), + Replicator.writeLocal(), + updateResponseAdapter, + curr -> curr.increment(node, 1))); + return Behaviors.same(); + } + + private Behavior onGetValue(GetValue cmd) { + replicator.tell( + new Replicator.Get<>(Key, Replicator.readLocal(), getResponseAdapter, Optional.of(cmd.replyTo))); + return Behaviors.same(); + } + + private Behavior onGetCachedValue(GetCachedValue cmd) { + cmd.replyTo.tell(cachedValue); + return Behaviors.same(); + } + + private Behavior onInternalGetResponse(InternalGetResponse msg) { + if (msg.rsp instanceof Replicator.GetSuccess) { + int value = ((Replicator.GetSuccess) msg.rsp).get(Key).getValue().intValue(); + ActorRef replyTo = (ActorRef) msg.rsp.request().get(); + replyTo.tell(value); + return Behaviors.same(); + } else { + // not dealing with failures + return Behaviors.unhandled(); + } + } + + private Behavior onInternalChanged(InternalChanged msg) { + GCounter counter = msg.chg.get(Key); + cachedValue = counter.getValue().intValue(); + return this; + } + } +// #sample + static Config config = ConfigFactory.parseString( "akka.actor.provider = cluster \n" + @@ -165,14 +198,13 @@ public class ReplicatorTest extends JUnitSuite { public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("ReplicatorTest", config); - private final ActorSystem system = actorSystemResource.getSystem(); + private final akka.actor.ActorSystem system = actorSystemResource.getSystem(); - akka.actor.typed.ActorSystem typedSystem() { + ActorSystem typedSystem() { return Adapter.toTyped(system); } - @Test public void shouldHaveApiForUpdateAndGet() { TestKit probe = new TestKit(system); @@ -180,9 +212,9 @@ public class ReplicatorTest extends JUnitSuite { ActorRef replicator = Adapter.spawnAnonymous(system, Replicator.behavior(settings)); ActorRef client = - Adapter.spawnAnonymous(system, Client.create(replicator, Cluster.get(system))); + Adapter.spawnAnonymous(system, Counter.create(replicator, Cluster.get(system))); - client.tell(new Increment()); + client.tell(Increment.INSTANCE); client.tell(new GetValue(Adapter.toTyped(probe.getRef()))); probe.expectMsg(1); } @@ -194,17 +226,17 @@ public class ReplicatorTest extends JUnitSuite { ActorRef replicator = Adapter.spawnAnonymous(system, Replicator.behavior(settings)); ActorRef client = - Adapter.spawnAnonymous(system, Client.create(replicator, Cluster.get(system))); + Adapter.spawnAnonymous(system, Counter.create(replicator, Cluster.get(system))); - client.tell(new Increment()); - client.tell(new Increment()); + client.tell(Increment.INSTANCE); + client.tell(Increment.INSTANCE); probe.awaitAssert(() -> { client.tell(new GetCachedValue(Adapter.toTyped(probe.getRef()))); probe.expectMsg(2); return null; }); - client.tell(new Increment()); + client.tell(Increment.INSTANCE); probe.awaitAssert(() -> { client.tell(new GetCachedValue(Adapter.toTyped(probe.getRef()))); probe.expectMsg(3); diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala index af5843d36d..b7b8362b66 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala @@ -4,6 +4,10 @@ package akka.cluster.ddata.typed.scaladsl +import org.scalatest.WordSpecLike +import akka.actor.testkit.typed.TestKitSettings + +// #sample import akka.actor.Scheduler import akka.actor.typed.{ ActorRef, Behavior } import akka.actor.typed.scaladsl.AskPattern._ @@ -13,14 +17,14 @@ import akka.cluster.Cluster import akka.cluster.ddata.typed.scaladsl.Replicator._ import akka.cluster.ddata.{ GCounter, GCounterKey, ReplicatedData } import akka.actor.testkit.typed.scaladsl._ -import akka.actor.testkit.typed.TestKitSettings import akka.util.Timeout import com.typesafe.config.ConfigFactory -import org.scalatest.WordSpecLike import scala.concurrent.Future import scala.concurrent.duration._ +// #sample + object ReplicatorSpec { val config = ConfigFactory.parseString( @@ -31,20 +35,24 @@ object ReplicatorSpec { akka.remote.artery.canonical.hostname = 127.0.0.1 """) + // #sample sealed trait ClientCommand final case object Increment extends ClientCommand final case class GetValue(replyTo: ActorRef[Int]) extends ClientCommand final case class GetCachedValue(replyTo: ActorRef[Int]) extends ClientCommand private sealed trait InternalMsg extends ClientCommand - private case class InternalUpdateResponse[A <: ReplicatedData](rsp: Replicator.UpdateResponse[A]) extends InternalMsg - private case class InternalGetResponse[A <: ReplicatedData](rsp: Replicator.GetResponse[A]) extends InternalMsg - private case class InternalChanged[A <: ReplicatedData](chg: Replicator.Changed[A]) extends InternalMsg + private case class InternalUpdateResponse(rsp: Replicator.UpdateResponse[GCounter]) extends InternalMsg + private case class InternalGetResponse(rsp: Replicator.GetResponse[GCounter]) extends InternalMsg + private case class InternalChanged(chg: Replicator.Changed[GCounter]) extends InternalMsg val Key = GCounterKey("counter") def client(replicator: ActorRef[Replicator.Command])(implicit cluster: Cluster): Behavior[ClientCommand] = Behaviors.setup[ClientCommand] { ctx ⇒ + // The distributed data types still need the implicit untyped Cluster. + // We will look into another solution for that. + val updateResponseAdapter: ActorRef[Replicator.UpdateResponse[GCounter]] = ctx.messageAdapter(InternalUpdateResponse.apply) @@ -92,6 +100,7 @@ object ReplicatorSpec { behavior(cachedValue = 0) } + // #sample object CompileOnlyTest { def shouldHaveConvenienceForAsk(): Unit = { diff --git a/akka-docs/src/main/paradox/typed/distributed-data.md b/akka-docs/src/main/paradox/typed/distributed-data.md index 14b2c9f5e6..cadf5e5ef4 100644 --- a/akka-docs/src/main/paradox/typed/distributed-data.md +++ b/akka-docs/src/main/paradox/typed/distributed-data.md @@ -1,5 +1,96 @@ # Distributed Data -TODO https://github.com/akka/akka/issues/24494 +## Dependency -See [https://akka.io/blog/2017/10/04/typed-cluster-tools](https://akka.io/blog/2017/10/04/typed-cluster-tools) +To use Akka Cluster Distributed Data Typed, you must add the following dependency in your project: + +@@dependency[sbt,Maven,Gradle] { + group=com.typesafe.akka + artifact=akka-cluster-typed_$scala.binary_version$ + version=$akka.version$ +} + +## Introduction + +*Akka Distributed Data* is useful when you need to share data between nodes in an +Akka Cluster. The data is accessed with an actor providing a key-value store like API. +The keys are unique identifiers with type information of the data values. The values +are *Conflict Free Replicated Data Types* (CRDTs). + +All data entries are spread to all nodes, or nodes with a certain role, in the cluster +via direct replication and gossip based dissemination. You have fine grained control +of the consistency level for reads and writes. + +The nature CRDTs makes it possible to perform updates from any node without coordination. +Concurrent updates from different nodes will automatically be resolved by the monotonic +merge function, which all data types must provide. The state changes always converge. +Several useful data types for counters, sets, maps and registers are provided and +you can also implement your own custom data types. + +It is eventually consistent and geared toward providing high read and write availability +(partition tolerance), with low latency. Note that in an eventually consistent system a read may return an +out-of-date value. + +## Using the Replicator + +The @scala[@unidoc[akka.cluster.ddata.typed.scaladsl.Replicator]]@java[@unidoc[akka.cluster.ddata.typed.javadsl.Replicator]] +actor provides the API for interacting with the data and is accessed through the extension +@scala[@unidoc[akka.cluster.ddata.typed.scaladsl.DistributedData]]@java[@unidoc[akka.cluster.ddata.typed.javadsl.DistributedData]]. + +The messages for the replicator, such as `Replicator.Update` are defined in @scala[`akka.cluster.ddata.typed.scaladsl.Replicator`] +@java[`akka.cluster.ddata.typed.scaladsl.Replicator`] but the actual CRDTs are the +same as in untyped, for example `akka.cluster.ddata.GCounter`. This will require an @scala[implicit] untyped `Cluster` +for now, we hope to improve this in the future ([issue #25746](https://github.com/akka/akka/issues/25746)). + +The replicator can contain multiple entries each containing a replicated data type, we therefore need to create a +key identifying the entry and helping us know what type it has, and then use that key for every interaction with +the replicator. Each replicated data type contains a factory for defining such a key. + +This sample uses the replicated data type `GCounter` to implement a counter that can be written to on any node of the +cluster: + +Scala +: @@snip [ReplicatorSpec.scala](/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala) { #sample } + +Java +: @@snip [ReplicatorTest.java](/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java) { #sample } + + +When we start up the actor we subscribe it to changes for our key, this means that whenever the replicator see a change +for the counter our actor will get a @scala[`Replicator.Changed[GCounter]`]@java[`Replicator.Changed`], since +this is not a message in our protocol, we use an adapter to wrap it in the internal `InternalChanged` message, which +is then handled in the regular message handling of the behavior. + +For an incoming `Increment` command, we send the `replicator` a `Replicator.Update` request, it contains five values: + + 1. the @scala[`Key`]@java[`KEY`] we want to update + 1. the data to use if as the empty state if the replicator has not seen the key before + 1. the consistency level we want for the update + 1. an @scala[`ActorRef[Replicator.UpdateResponse[GCounter]]`]@java[`ActorRef>`] + to respond to when the update is completed + 1. a function that takes a previous state and updates it, in our case by incrementing it with 1 + +Whenever the distributed counter is updated, we cache the value so that we can answer requests about the value without +the extra interaction with the replicator using the `GetCachedValue` command. + +We also support asking the replicator, using the `GetValue`, demonstrating how many of the replicator commands take +a pass-along value that will be put in the response message so that we do not need to keep a local state tracking +what actors are waiting for responses, but can extract the `replyTo` actor from the replicator when it responds +with a `GetSuccess`. See the @ref[the untyped Distributed Data documentation](../distributed-data.md#using-the-replicator) +for more details about what interactions with the replicator there are. + + +### Replicated data types + +Akka contains a set of useful replicated data types and it is fully possible to implement custom replicated data types. +For more details, read @ref[the untyped Distributed Data documentation](../distributed-data.md#data-types) + + +### Running separate instances of the replicator + +For some use cases, for example when limiting the replicator to certain roles, or using different subsets on different roles, +it makes sense to start separate replicators, this needs to be done on all nodes, or +the group of nodes tagged with a specific role. To do this with the Typed Distributed Data you will first +have to start an untyped `Replicator` and pass it to the `Replicator.behavior` method that takes an untyped +actor ref. All such `Replicator`s must run on the same path in the untyped actor hierarchy. +