diff --git a/akka-typed-tests/src/test/java/akka/typed/cluster/ddata/javadsl/ReplicatorTest.java b/akka-typed-tests/src/test/java/akka/typed/cluster/ddata/javadsl/ReplicatorTest.java new file mode 100644 index 0000000000..d994265d5a --- /dev/null +++ b/akka-typed-tests/src/test/java/akka/typed/cluster/ddata/javadsl/ReplicatorTest.java @@ -0,0 +1,134 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ + +package akka.typed.cluster.ddata.javadsl; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import java.util.Optional; +import org.junit.ClassRule; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; + +import akka.actor.ActorSystem; +import akka.cluster.Cluster; +import akka.cluster.ddata.GCounter; +import akka.cluster.ddata.GCounterKey; +import akka.cluster.ddata.Key; +import akka.cluster.ddata.ReplicatedData; +import akka.testkit.AkkaJUnitActorSystemResource; +import akka.testkit.javadsl.TestKit; +import akka.typed.ActorRef; +import akka.typed.Behavior; +import akka.typed.javadsl.Actor; +import akka.typed.javadsl.Adapter; + +public class ReplicatorTest extends JUnitSuite { + + static interface ClientCommand { + } + + static final class Increment implements ClientCommand { + } + + static final class GetValue implements ClientCommand { + final ActorRef replyTo; + + GetValue(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + + static interface InternalMsg extends ClientCommand { + } + + static final class InternalUpdateResponse implements InternalMsg { + final Replicator.UpdateResponse rsp; + + public InternalUpdateResponse(Replicator.UpdateResponse rsp) { + this.rsp = rsp; + } + } + + static final class InternalGetResponse implements InternalMsg { + final Replicator.GetResponse rsp; + + public InternalGetResponse(Replicator.GetResponse rsp) { + this.rsp = rsp; + } + } + + static final Key Key = GCounterKey.create("counter"); + + static Behavior client(ActorRef> replicator, Cluster node) { + return Actor.deferred(c -> { + + final ActorRef> updateResponseAdapter = + c.spawnAdapter(m -> new InternalUpdateResponse<>(m)); + + final ActorRef> getResponseAdapter = + c.spawnAdapter(m -> new InternalGetResponse<>(m)); + + return Actor.immutable(ClientCommand.class) + .onMessage(Increment.class, (ctx, cmd) -> { + replicator.tell( + new Replicator.Update(Key, GCounter.empty(), Replicator.writeLocal(), updateResponseAdapter, + curr -> curr.increment(node, 1))); + return Actor.same(); + }) + .onMessage(InternalUpdateResponse.class, (ctx, msg) -> { + return Actor.same(); + }) + .onMessage(GetValue.class, (ctx, cmd) -> { + replicator.tell( + new Replicator.Get(Key, Replicator.readLocal(), getResponseAdapter, Optional.of(cmd.replyTo))); + return Actor.same(); + }) + .onMessage(InternalGetResponse.class, (ctx, msg) -> { + if (msg.rsp instanceof Replicator.GetSuccess) { + int value = ((Replicator.GetSuccess) msg.rsp).get(Key).getValue().intValue(); + ActorRef replyTo = (ActorRef) msg.rsp.request().get(); + replyTo.tell(value); + } else { + // not dealing with failures + } + return Actor.same(); + }) + .build(); + }); +} + + + static Config config = ConfigFactory.parseString( + "akka.actor.provider = cluster \n" + + "akka.remote.netty.tcp.port = 0 \n" + + "akka.remote.artery.canonical.port = 0 \n"); + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("ReplicatorTest", + config); + + private final ActorSystem system = actorSystemResource.getSystem(); + + akka.typed.ActorSystem typedSystem() { + return Adapter.toTyped(system); + } + + + + @Test + public void apiPrototype() { + TestKit probe = new TestKit(system); + akka.cluster.ddata.ReplicatorSettings settings = ReplicatorSettings.apply(typedSystem()); + ActorRef> replicator = + Adapter.spawn(system, Replicator.behavior(settings), "replicator"); + ActorRef client = + Adapter.spawnAnonymous(system, client(replicator, Cluster.get(system))); + + client.tell(new Increment()); + client.tell(new GetValue(Adapter.toTyped(probe.getRef()))); + probe.expectMsg(1); + } + +} diff --git a/akka-typed/src/main/scala/akka/typed/cluster/ddata/internal/ReplicatorBehavior.scala b/akka-typed/src/main/scala/akka/typed/cluster/ddata/internal/ReplicatorBehavior.scala index 527dcac0fa..e8c1980b8d 100644 --- a/akka-typed/src/main/scala/akka/typed/cluster/ddata/internal/ReplicatorBehavior.scala +++ b/akka-typed/src/main/scala/akka/typed/cluster/ddata/internal/ReplicatorBehavior.scala @@ -3,39 +3,85 @@ */ package akka.typed.cluster.ddata.internal -import akka.typed.cluster.ddata.scaladsl.Replicator -import akka.typed.cluster.ddata.scaladsl.ReplicatorSettings +import scala.compat.java8.OptionConverters._ +import scala.concurrent.duration._ +import scala.concurrent.duration.Duration + import akka.annotation.InternalApi +import akka.cluster.{ ddata ⇒ dd } +import akka.pattern.ask import akka.typed.Behavior import akka.typed.scaladsl.Actor import akka.typed.scaladsl.adapter._ -import akka.cluster.{ ddata ⇒ dd } +import akka.util.Timeout /** * INTERNAL API */ @InternalApi private[akka] object ReplicatorBehavior { - import Replicator._ - def behavior(settings: ReplicatorSettings): Behavior[Command[_]] = { + import akka.typed.cluster.ddata.javadsl.{ Replicator ⇒ JReplicator } + import akka.typed.cluster.ddata.scaladsl.{ Replicator ⇒ SReplicator } + + val localAskTimeout = 60.seconds // ReadLocal, WriteLocal shouldn't timeout + val additionalAskTimeout = 1.second + + def behavior(settings: dd.ReplicatorSettings): Behavior[SReplicator.Command[_]] = { val untypedReplicatorProps = dd.Replicator.props(settings) Actor.deferred { ctx ⇒ // FIXME perhaps add supervisor for restarting val untypedReplicator = ctx.actorOf(untypedReplicatorProps, name = "underlying") - Actor.immutable[Command[_]] { (ctx, msg) ⇒ + Actor.immutable[SReplicator.Command[_]] { (ctx, msg) ⇒ msg match { - case cmd: Replicator.Get[_] ⇒ + case cmd: SReplicator.Get[_] ⇒ untypedReplicator.tell( dd.Replicator.Get(cmd.key, cmd.consistency, cmd.request), sender = cmd.replyTo.toUntyped) Actor.same - case cmd: Replicator.Update[_] ⇒ + case cmd: SReplicator.Update[_] ⇒ untypedReplicator.tell( dd.Replicator.Update(cmd.key, cmd.writeConsistency, cmd.request)(cmd.modify), sender = cmd.replyTo.toUntyped) Actor.same + + case cmd: JReplicator.Get[d] ⇒ + implicit val timeout = Timeout(cmd.consistency.timeout match { + case Duration.Zero ⇒ localAskTimeout + case t ⇒ t + additionalAskTimeout + }) + import ctx.executionContext + val reply = + (untypedReplicator ? dd.Replicator.Get(cmd.key, cmd.consistency.toUntyped, cmd.request.asScala)) + .mapTo[dd.Replicator.GetResponse[d]].map { + case rsp: dd.Replicator.GetSuccess[d] ⇒ JReplicator.GetSuccess(rsp.key, rsp.request.asJava)(rsp.dataValue) + case rsp: dd.Replicator.NotFound[d] ⇒ JReplicator.NotFound(rsp.key, rsp.request.asJava) + case rsp: dd.Replicator.GetFailure[d] ⇒ JReplicator.GetFailure(rsp.key, rsp.request.asJava) + }.recover { + case _ ⇒ JReplicator.GetFailure(cmd.key, cmd.request) + } + reply.foreach { cmd.replyTo ! _ } + Actor.same + + case cmd: JReplicator.Update[d] ⇒ + implicit val timeout = Timeout(cmd.writeConsistency.timeout match { + case Duration.Zero ⇒ localAskTimeout + case t ⇒ t + additionalAskTimeout + }) + import ctx.executionContext + val reply = + (untypedReplicator ? dd.Replicator.Update(cmd.key, cmd.writeConsistency.toUntyped, cmd.request.asScala)(cmd.modify)) + .mapTo[dd.Replicator.UpdateResponse[d]].map { + case rsp: dd.Replicator.UpdateSuccess[d] ⇒ JReplicator.UpdateSuccess(rsp.key, rsp.request.asJava) + case rsp: dd.Replicator.UpdateTimeout[d] ⇒ JReplicator.UpdateTimeout(rsp.key, rsp.request.asJava) + case rsp: dd.Replicator.ModifyFailure[d] ⇒ JReplicator.ModifyFailure(rsp.key, rsp.errorMessage, rsp.cause, rsp.request.asJava) + case rsp: dd.Replicator.StoreFailure[d] ⇒ JReplicator.StoreFailure(rsp.key, rsp.request.asJava) + }.recover { + case _ ⇒ JReplicator.UpdateTimeout(cmd.key, cmd.request) + } + reply.foreach { cmd.replyTo ! _ } + Actor.same } } diff --git a/akka-typed/src/main/scala/akka/typed/cluster/ddata/javadsl/Replicator.scala b/akka-typed/src/main/scala/akka/typed/cluster/ddata/javadsl/Replicator.scala new file mode 100644 index 0000000000..1f3e836045 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/cluster/ddata/javadsl/Replicator.scala @@ -0,0 +1,257 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.typed.cluster.ddata.javadsl + +import akka.actor.NoSerializationVerificationNeeded + +import java.util.function.{ Function ⇒ JFunction } +import akka.cluster.{ ddata ⇒ dd } +import akka.typed.cluster.ddata.scaladsl +import akka.cluster.ddata.Key +import akka.cluster.ddata.ReplicatedData +import akka.typed.ActorRef +import akka.typed.Behavior +import akka.typed.cluster.ddata.internal.ReplicatorBehavior + +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.Duration +import java.util.Optional +import akka.actor.DeadLetterSuppression +import akka.annotation.InternalApi + +object Replicator { + import dd.Replicator.DefaultMajorityMinCap + + def behavior(settings: dd.ReplicatorSettings): Behavior[Command[_]] = + ReplicatorBehavior.behavior(settings).narrow[Command[_]] + + sealed trait Command[A <: ReplicatedData] extends scaladsl.Replicator.Command[A] { + def key: Key[A] + } + + sealed trait ReadConsistency { + def timeout: FiniteDuration + + /** INTERNAL API */ + @InternalApi private[akka] def toUntyped: dd.Replicator.ReadConsistency + } + case object ReadLocal extends ReadConsistency { + override def timeout: FiniteDuration = Duration.Zero + + /** INTERNAL API */ + @InternalApi private[akka] override def toUntyped = dd.Replicator.ReadLocal + } + final case class ReadFrom(n: Int, timeout: FiniteDuration) extends ReadConsistency { + require(n >= 2, "ReadFrom n must be >= 2, use ReadLocal for n=1") + + /** INTERNAL API */ + @InternalApi private[akka] override def toUntyped = dd.Replicator.ReadFrom(n, timeout) + } + final case class ReadMajority(timeout: FiniteDuration, minCap: Int = DefaultMajorityMinCap) extends ReadConsistency { + def this(timeout: FiniteDuration) = this(timeout, DefaultMajorityMinCap) + + /** INTERNAL API */ + @InternalApi private[akka] override def toUntyped = dd.Replicator.ReadMajority(timeout, minCap) + } + final case class ReadAll(timeout: FiniteDuration) extends ReadConsistency { + /** INTERNAL API */ + @InternalApi private[akka] override def toUntyped = dd.Replicator.ReadAll(timeout) + } + + sealed trait WriteConsistency { + def timeout: FiniteDuration + + /** INTERNAL API */ + @InternalApi private[akka] def toUntyped: dd.Replicator.WriteConsistency + } + case object WriteLocal extends WriteConsistency { + override def timeout: FiniteDuration = Duration.Zero + + /** INTERNAL API */ + @InternalApi private[akka] override def toUntyped = dd.Replicator.WriteLocal + } + final case class WriteTo(n: Int, timeout: FiniteDuration) extends WriteConsistency { + require(n >= 2, "WriteTo n must be >= 2, use WriteLocal for n=1") + + /** INTERNAL API */ + @InternalApi private[akka] override def toUntyped = dd.Replicator.WriteTo(n, timeout) + } + final case class WriteMajority(timeout: FiniteDuration, minCap: Int = DefaultMajorityMinCap) extends WriteConsistency { + def this(timeout: FiniteDuration) = this(timeout, DefaultMajorityMinCap) + + /** INTERNAL API */ + @InternalApi private[akka] override def toUntyped = dd.Replicator.WriteMajority(timeout, minCap) + } + final case class WriteAll(timeout: FiniteDuration) extends WriteConsistency { + /** INTERNAL API */ + @InternalApi private[akka] override def toUntyped = dd.Replicator.WriteAll(timeout) + } + + /** + * The `ReadLocal` instance + */ + def readLocal: ReadConsistency = ReadLocal + + /** + * The `WriteLocal` instance + */ + def writeLocal: WriteConsistency = WriteLocal + + /** + * Send this message to the local `Replicator` to retrieve a data value for the + * given `key`. The `Replicator` will reply with one of the [[GetResponse]] messages. + * + * The optional `request` context is included in the reply messages. This is a convenient + * way to pass contextual information (e.g. original sender) without having to use `ask` + * or maintain local correlation data structures. + */ + final case class Get[A <: ReplicatedData](key: Key[A], consistency: ReadConsistency, replyTo: ActorRef[GetResponse[A]], request: Optional[Any]) + extends Command[A] { + + def this(key: Key[A], consistency: ReadConsistency, replyTo: ActorRef[GetResponse[A]]) = + this(key, consistency, replyTo, Optional.empty[Any]) + } + + sealed abstract class GetResponse[A <: ReplicatedData] extends NoSerializationVerificationNeeded { + def key: Key[A] + def request: Optional[Any] + def getRequest: Optional[Any] = request + } + /** + * Reply from `Get`. The data value is retrieved with [[#get]] using the typed key. + */ + final case class GetSuccess[A <: ReplicatedData](key: Key[A], request: Optional[Any])(data: A) + extends GetResponse[A] { + + /** + * The data value, with correct type. + * Scala pattern matching cannot infer the type from the `key` parameter. + */ + def get[T <: ReplicatedData](key: Key[T]): T = { + require(key == this.key, "wrong key used, must use contained key") + data.asInstanceOf[T] + } + + /** + * The data value. Use [[#get]] to get the fully typed value. + */ + def dataValue: A = data + } + final case class NotFound[A <: ReplicatedData](key: Key[A], request: Optional[Any]) + extends GetResponse[A] + /** + * The [[Get]] request could not be fulfill according to the given + * [[ReadConsistency consistency level]] and [[ReadConsistency#timeout timeout]]. + */ + final case class GetFailure[A <: ReplicatedData](key: Key[A], request: Optional[Any]) + extends GetResponse[A] + + object Update { + + private def modifyWithInitial[A <: ReplicatedData](initial: A, modify: A ⇒ A): Option[A] ⇒ A = { + case Some(data) ⇒ modify(data) + case None ⇒ modify(initial) + } + } + /** + * Send this message to the local `Replicator` to update a data value for the + * given `key`. The `Replicator` will reply with one of the [[UpdateResponse]] messages. + * + * Note that the [[Replicator.Update$ companion]] object provides `apply` functions for convenient + * construction of this message. + * + * The current data value for the `key` is passed as parameter to the `modify` function. + * It is `None` if there is no value for the `key`, and otherwise `Some(data)`. The function + * is supposed to return the new value of the data, which will then be replicated according to + * the given `writeConsistency`. + * + * The `modify` function is called by the `Replicator` actor and must therefore be a pure + * function that only uses the data parameter and stable fields from enclosing scope. It must + * for example not access `sender()` reference of an enclosing actor. + */ + final case class Update[A <: ReplicatedData] private (key: Key[A], writeConsistency: WriteConsistency, + replyTo: ActorRef[UpdateResponse[A]], request: Optional[Any])(val modify: Option[A] ⇒ A) + extends Command[A] with NoSerializationVerificationNeeded { + + /** + * Modify value of local `Replicator` and replicate with given `writeConsistency`. + * + * The current value for the `key` is passed to the `modify` function. + * If there is no current data value for the `key` the `initial` value will be + * passed to the `modify` function. + */ + def this( + key: Key[A], initial: A, writeConsistency: WriteConsistency, replyTo: ActorRef[UpdateResponse[A]], modify: JFunction[A, A]) = + this(key, writeConsistency, replyTo, Optional.empty[Any])( + Update.modifyWithInitial(initial, data ⇒ modify.apply(data))) + + /** + * Java API: Modify value of local `Replicator` and replicate with given `writeConsistency`. + * + * The current value for the `key` is passed to the `modify` function. + * If there is no current data value for the `key` the `initial` value will be + * passed to the `modify` function. + * + * The optional `request` context is included in the reply messages. This is a convenient + * way to pass contextual information (e.g. original sender) without having to use `ask` + * or local correlation data structures. + */ + def this( + key: Key[A], initial: A, writeConsistency: WriteConsistency, replyTo: ActorRef[UpdateResponse[A]], + request: Optional[Any], modify: JFunction[A, A]) = + this(key, writeConsistency, replyTo, request)(Update.modifyWithInitial(initial, data ⇒ modify.apply(data))) + + } + + sealed abstract class UpdateResponse[A <: ReplicatedData] extends NoSerializationVerificationNeeded { + def key: Key[A] + def request: Optional[Any] + def getRequest: Optional[Any] = request + } + final case class UpdateSuccess[A <: ReplicatedData](key: Key[A], request: Optional[Any]) + extends UpdateResponse[A] with DeadLetterSuppression + + sealed abstract class UpdateFailure[A <: ReplicatedData] extends UpdateResponse[A] + + /** + * The direct replication of the [[Update]] could not be fulfill according to + * the given [[WriteConsistency consistency level]] and + * [[WriteConsistency#timeout timeout]]. + * + * The `Update` was still performed locally and possibly replicated to some nodes. + * It will eventually be disseminated to other replicas, unless the local replica + * crashes before it has been able to communicate with other replicas. + */ + final case class UpdateTimeout[A <: ReplicatedData](key: Key[A], request: Optional[Any]) extends UpdateFailure[A] + /** + * If the `modify` function of the [[Update]] throws an exception the reply message + * will be this `ModifyFailure` message. The original exception is included as `cause`. + */ + final case class ModifyFailure[A <: ReplicatedData](key: Key[A], errorMessage: String, cause: Throwable, request: Optional[Any]) + extends UpdateFailure[A] { + override def toString: String = s"ModifyFailure [$key]: $errorMessage" + } + /** + * The local store or direct replication of the [[Update]] could not be fulfill according to + * the given [[WriteConsistency consistency level]] due to durable store errors. This is + * only used for entries that have been configured to be durable. + * + * The `Update` was still performed in memory locally and possibly replicated to some nodes, + * but it might not have been written to durable storage. + * It will eventually be disseminated to other replicas, unless the local replica + * crashes before it has been able to communicate with other replicas. + */ + final case class StoreFailure[A <: ReplicatedData](key: Key[A], request: Optional[Any]) + extends UpdateFailure[A] with DeleteResponse[A] { + + override def getRequest: Optional[Any] = request + } + + sealed trait DeleteResponse[A <: ReplicatedData] extends NoSerializationVerificationNeeded { + def key: Key[A] + def request: Optional[Any] + def getRequest: Optional[Any] = request + } + +} diff --git a/akka-typed/src/main/scala/akka/typed/cluster/ddata/javadsl/ReplicatorSettings.scala b/akka-typed/src/main/scala/akka/typed/cluster/ddata/javadsl/ReplicatorSettings.scala new file mode 100644 index 0000000000..380ecb46b0 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/cluster/ddata/javadsl/ReplicatorSettings.scala @@ -0,0 +1,25 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.cluster.ddata.javadsl + +import akka.cluster.{ ddata ⇒ dd } +import akka.typed.ActorSystem +import akka.typed.scaladsl.adapter._ +import com.typesafe.config.Config + +object ReplicatorSettings { + /** + * Create settings from the default configuration + * `akka.cluster.distributed-data`. + */ + def apply(system: ActorSystem[_]): dd.ReplicatorSettings = + dd.ReplicatorSettings(system.toUntyped) + + /** + * Create settings from a configuration with the same layout as + * the default configuration `akka.cluster.distributed-data`. + */ + def apply(config: Config): dd.ReplicatorSettings = + dd.ReplicatorSettings(config) +} diff --git a/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/Replicator.scala b/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/Replicator.scala index d7b5650753..deeae84f24 100644 --- a/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/Replicator.scala +++ b/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/Replicator.scala @@ -29,7 +29,7 @@ object Replicator { type WriteMajority = dd.Replicator.WriteMajority type WriteAll = dd.Replicator.WriteAll - sealed trait Command[A <: ReplicatedData] { + trait Command[A <: ReplicatedData] { def key: Key[A] }