first stab at javadsl for Typed ddata

This commit is contained in:
Patrik Nordwall 2017-09-19 07:24:51 +02:00
parent 341d4e5ba9
commit 9496b59289
5 changed files with 471 additions and 9 deletions

View file

@ -0,0 +1,134 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster.ddata.javadsl;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.Optional;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import akka.actor.ActorSystem;
import akka.cluster.Cluster;
import akka.cluster.ddata.GCounter;
import akka.cluster.ddata.GCounterKey;
import akka.cluster.ddata.Key;
import akka.cluster.ddata.ReplicatedData;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.javadsl.TestKit;
import akka.typed.ActorRef;
import akka.typed.Behavior;
import akka.typed.javadsl.Actor;
import akka.typed.javadsl.Adapter;
public class ReplicatorTest extends JUnitSuite {
static interface ClientCommand {
}
static final class Increment implements ClientCommand {
}
static final class GetValue implements ClientCommand {
final ActorRef<Integer> replyTo;
GetValue(ActorRef<Integer> replyTo) {
this.replyTo = replyTo;
}
}
static interface InternalMsg extends ClientCommand {
}
static final class InternalUpdateResponse<A extends ReplicatedData> implements InternalMsg {
final Replicator.UpdateResponse<A> rsp;
public InternalUpdateResponse(Replicator.UpdateResponse<A> rsp) {
this.rsp = rsp;
}
}
static final class InternalGetResponse<A extends ReplicatedData> implements InternalMsg {
final Replicator.GetResponse<A> rsp;
public InternalGetResponse(Replicator.GetResponse<A> rsp) {
this.rsp = rsp;
}
}
static final Key<GCounter> Key = GCounterKey.create("counter");
static Behavior<ClientCommand> client(ActorRef<Replicator.Command<?>> replicator, Cluster node) {
return Actor.deferred(c -> {
final ActorRef<Replicator.UpdateResponse<GCounter>> updateResponseAdapter =
c.spawnAdapter(m -> new InternalUpdateResponse<>(m));
final ActorRef<Replicator.GetResponse<GCounter>> getResponseAdapter =
c.spawnAdapter(m -> new InternalGetResponse<>(m));
return Actor.immutable(ClientCommand.class)
.onMessage(Increment.class, (ctx, cmd) -> {
replicator.tell(
new Replicator.Update<GCounter>(Key, GCounter.empty(), Replicator.writeLocal(), updateResponseAdapter,
curr -> curr.increment(node, 1)));
return Actor.same();
})
.onMessage(InternalUpdateResponse.class, (ctx, msg) -> {
return Actor.same();
})
.onMessage(GetValue.class, (ctx, cmd) -> {
replicator.tell(
new Replicator.Get<GCounter>(Key, Replicator.readLocal(), getResponseAdapter, Optional.of(cmd.replyTo)));
return Actor.same();
})
.onMessage(InternalGetResponse.class, (ctx, msg) -> {
if (msg.rsp instanceof Replicator.GetSuccess) {
int value = ((Replicator.GetSuccess<?>) msg.rsp).get(Key).getValue().intValue();
ActorRef<Integer> replyTo = (ActorRef<Integer>) msg.rsp.request().get();
replyTo.tell(value);
} else {
// not dealing with failures
}
return Actor.same();
})
.build();
});
}
static Config config = ConfigFactory.parseString(
"akka.actor.provider = cluster \n" +
"akka.remote.netty.tcp.port = 0 \n" +
"akka.remote.artery.canonical.port = 0 \n");
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("ReplicatorTest",
config);
private final ActorSystem system = actorSystemResource.getSystem();
akka.typed.ActorSystem<?> typedSystem() {
return Adapter.toTyped(system);
}
@Test
public void apiPrototype() {
TestKit probe = new TestKit(system);
akka.cluster.ddata.ReplicatorSettings settings = ReplicatorSettings.apply(typedSystem());
ActorRef<Replicator.Command<?>> replicator =
Adapter.spawn(system, Replicator.behavior(settings), "replicator");
ActorRef<ClientCommand> client =
Adapter.spawnAnonymous(system, client(replicator, Cluster.get(system)));
client.tell(new Increment());
client.tell(new GetValue(Adapter.toTyped(probe.getRef())));
probe.expectMsg(1);
}
}

View file

@ -3,39 +3,85 @@
*/
package akka.typed.cluster.ddata.internal
import akka.typed.cluster.ddata.scaladsl.Replicator
import akka.typed.cluster.ddata.scaladsl.ReplicatorSettings
import scala.compat.java8.OptionConverters._
import scala.concurrent.duration._
import scala.concurrent.duration.Duration
import akka.annotation.InternalApi
import akka.cluster.{ ddata dd }
import akka.pattern.ask
import akka.typed.Behavior
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.adapter._
import akka.cluster.{ ddata dd }
import akka.util.Timeout
/**
* INTERNAL API
*/
@InternalApi private[akka] object ReplicatorBehavior {
import Replicator._
def behavior(settings: ReplicatorSettings): Behavior[Command[_]] = {
import akka.typed.cluster.ddata.javadsl.{ Replicator JReplicator }
import akka.typed.cluster.ddata.scaladsl.{ Replicator SReplicator }
val localAskTimeout = 60.seconds // ReadLocal, WriteLocal shouldn't timeout
val additionalAskTimeout = 1.second
def behavior(settings: dd.ReplicatorSettings): Behavior[SReplicator.Command[_]] = {
val untypedReplicatorProps = dd.Replicator.props(settings)
Actor.deferred { ctx
// FIXME perhaps add supervisor for restarting
val untypedReplicator = ctx.actorOf(untypedReplicatorProps, name = "underlying")
Actor.immutable[Command[_]] { (ctx, msg)
Actor.immutable[SReplicator.Command[_]] { (ctx, msg)
msg match {
case cmd: Replicator.Get[_]
case cmd: SReplicator.Get[_]
untypedReplicator.tell(
dd.Replicator.Get(cmd.key, cmd.consistency, cmd.request),
sender = cmd.replyTo.toUntyped)
Actor.same
case cmd: Replicator.Update[_]
case cmd: SReplicator.Update[_]
untypedReplicator.tell(
dd.Replicator.Update(cmd.key, cmd.writeConsistency, cmd.request)(cmd.modify),
sender = cmd.replyTo.toUntyped)
Actor.same
case cmd: JReplicator.Get[d]
implicit val timeout = Timeout(cmd.consistency.timeout match {
case Duration.Zero localAskTimeout
case t t + additionalAskTimeout
})
import ctx.executionContext
val reply =
(untypedReplicator ? dd.Replicator.Get(cmd.key, cmd.consistency.toUntyped, cmd.request.asScala))
.mapTo[dd.Replicator.GetResponse[d]].map {
case rsp: dd.Replicator.GetSuccess[d] JReplicator.GetSuccess(rsp.key, rsp.request.asJava)(rsp.dataValue)
case rsp: dd.Replicator.NotFound[d] JReplicator.NotFound(rsp.key, rsp.request.asJava)
case rsp: dd.Replicator.GetFailure[d] JReplicator.GetFailure(rsp.key, rsp.request.asJava)
}.recover {
case _ JReplicator.GetFailure(cmd.key, cmd.request)
}
reply.foreach { cmd.replyTo ! _ }
Actor.same
case cmd: JReplicator.Update[d]
implicit val timeout = Timeout(cmd.writeConsistency.timeout match {
case Duration.Zero localAskTimeout
case t t + additionalAskTimeout
})
import ctx.executionContext
val reply =
(untypedReplicator ? dd.Replicator.Update(cmd.key, cmd.writeConsistency.toUntyped, cmd.request.asScala)(cmd.modify))
.mapTo[dd.Replicator.UpdateResponse[d]].map {
case rsp: dd.Replicator.UpdateSuccess[d] JReplicator.UpdateSuccess(rsp.key, rsp.request.asJava)
case rsp: dd.Replicator.UpdateTimeout[d] JReplicator.UpdateTimeout(rsp.key, rsp.request.asJava)
case rsp: dd.Replicator.ModifyFailure[d] JReplicator.ModifyFailure(rsp.key, rsp.errorMessage, rsp.cause, rsp.request.asJava)
case rsp: dd.Replicator.StoreFailure[d] JReplicator.StoreFailure(rsp.key, rsp.request.asJava)
}.recover {
case _ JReplicator.UpdateTimeout(cmd.key, cmd.request)
}
reply.foreach { cmd.replyTo ! _ }
Actor.same
}
}

View file

@ -0,0 +1,257 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster.ddata.javadsl
import akka.actor.NoSerializationVerificationNeeded
import java.util.function.{ Function JFunction }
import akka.cluster.{ ddata dd }
import akka.typed.cluster.ddata.scaladsl
import akka.cluster.ddata.Key
import akka.cluster.ddata.ReplicatedData
import akka.typed.ActorRef
import akka.typed.Behavior
import akka.typed.cluster.ddata.internal.ReplicatorBehavior
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.Duration
import java.util.Optional
import akka.actor.DeadLetterSuppression
import akka.annotation.InternalApi
object Replicator {
import dd.Replicator.DefaultMajorityMinCap
def behavior(settings: dd.ReplicatorSettings): Behavior[Command[_]] =
ReplicatorBehavior.behavior(settings).narrow[Command[_]]
sealed trait Command[A <: ReplicatedData] extends scaladsl.Replicator.Command[A] {
def key: Key[A]
}
sealed trait ReadConsistency {
def timeout: FiniteDuration
/** INTERNAL API */
@InternalApi private[akka] def toUntyped: dd.Replicator.ReadConsistency
}
case object ReadLocal extends ReadConsistency {
override def timeout: FiniteDuration = Duration.Zero
/** INTERNAL API */
@InternalApi private[akka] override def toUntyped = dd.Replicator.ReadLocal
}
final case class ReadFrom(n: Int, timeout: FiniteDuration) extends ReadConsistency {
require(n >= 2, "ReadFrom n must be >= 2, use ReadLocal for n=1")
/** INTERNAL API */
@InternalApi private[akka] override def toUntyped = dd.Replicator.ReadFrom(n, timeout)
}
final case class ReadMajority(timeout: FiniteDuration, minCap: Int = DefaultMajorityMinCap) extends ReadConsistency {
def this(timeout: FiniteDuration) = this(timeout, DefaultMajorityMinCap)
/** INTERNAL API */
@InternalApi private[akka] override def toUntyped = dd.Replicator.ReadMajority(timeout, minCap)
}
final case class ReadAll(timeout: FiniteDuration) extends ReadConsistency {
/** INTERNAL API */
@InternalApi private[akka] override def toUntyped = dd.Replicator.ReadAll(timeout)
}
sealed trait WriteConsistency {
def timeout: FiniteDuration
/** INTERNAL API */
@InternalApi private[akka] def toUntyped: dd.Replicator.WriteConsistency
}
case object WriteLocal extends WriteConsistency {
override def timeout: FiniteDuration = Duration.Zero
/** INTERNAL API */
@InternalApi private[akka] override def toUntyped = dd.Replicator.WriteLocal
}
final case class WriteTo(n: Int, timeout: FiniteDuration) extends WriteConsistency {
require(n >= 2, "WriteTo n must be >= 2, use WriteLocal for n=1")
/** INTERNAL API */
@InternalApi private[akka] override def toUntyped = dd.Replicator.WriteTo(n, timeout)
}
final case class WriteMajority(timeout: FiniteDuration, minCap: Int = DefaultMajorityMinCap) extends WriteConsistency {
def this(timeout: FiniteDuration) = this(timeout, DefaultMajorityMinCap)
/** INTERNAL API */
@InternalApi private[akka] override def toUntyped = dd.Replicator.WriteMajority(timeout, minCap)
}
final case class WriteAll(timeout: FiniteDuration) extends WriteConsistency {
/** INTERNAL API */
@InternalApi private[akka] override def toUntyped = dd.Replicator.WriteAll(timeout)
}
/**
* The `ReadLocal` instance
*/
def readLocal: ReadConsistency = ReadLocal
/**
* The `WriteLocal` instance
*/
def writeLocal: WriteConsistency = WriteLocal
/**
* Send this message to the local `Replicator` to retrieve a data value for the
* given `key`. The `Replicator` will reply with one of the [[GetResponse]] messages.
*
* The optional `request` context is included in the reply messages. This is a convenient
* way to pass contextual information (e.g. original sender) without having to use `ask`
* or maintain local correlation data structures.
*/
final case class Get[A <: ReplicatedData](key: Key[A], consistency: ReadConsistency, replyTo: ActorRef[GetResponse[A]], request: Optional[Any])
extends Command[A] {
def this(key: Key[A], consistency: ReadConsistency, replyTo: ActorRef[GetResponse[A]]) =
this(key, consistency, replyTo, Optional.empty[Any])
}
sealed abstract class GetResponse[A <: ReplicatedData] extends NoSerializationVerificationNeeded {
def key: Key[A]
def request: Optional[Any]
def getRequest: Optional[Any] = request
}
/**
* Reply from `Get`. The data value is retrieved with [[#get]] using the typed key.
*/
final case class GetSuccess[A <: ReplicatedData](key: Key[A], request: Optional[Any])(data: A)
extends GetResponse[A] {
/**
* The data value, with correct type.
* Scala pattern matching cannot infer the type from the `key` parameter.
*/
def get[T <: ReplicatedData](key: Key[T]): T = {
require(key == this.key, "wrong key used, must use contained key")
data.asInstanceOf[T]
}
/**
* The data value. Use [[#get]] to get the fully typed value.
*/
def dataValue: A = data
}
final case class NotFound[A <: ReplicatedData](key: Key[A], request: Optional[Any])
extends GetResponse[A]
/**
* The [[Get]] request could not be fulfill according to the given
* [[ReadConsistency consistency level]] and [[ReadConsistency#timeout timeout]].
*/
final case class GetFailure[A <: ReplicatedData](key: Key[A], request: Optional[Any])
extends GetResponse[A]
object Update {
private def modifyWithInitial[A <: ReplicatedData](initial: A, modify: A A): Option[A] A = {
case Some(data) modify(data)
case None modify(initial)
}
}
/**
* Send this message to the local `Replicator` to update a data value for the
* given `key`. The `Replicator` will reply with one of the [[UpdateResponse]] messages.
*
* Note that the [[Replicator.Update$ companion]] object provides `apply` functions for convenient
* construction of this message.
*
* The current data value for the `key` is passed as parameter to the `modify` function.
* It is `None` if there is no value for the `key`, and otherwise `Some(data)`. The function
* is supposed to return the new value of the data, which will then be replicated according to
* the given `writeConsistency`.
*
* The `modify` function is called by the `Replicator` actor and must therefore be a pure
* function that only uses the data parameter and stable fields from enclosing scope. It must
* for example not access `sender()` reference of an enclosing actor.
*/
final case class Update[A <: ReplicatedData] private (key: Key[A], writeConsistency: WriteConsistency,
replyTo: ActorRef[UpdateResponse[A]], request: Optional[Any])(val modify: Option[A] A)
extends Command[A] with NoSerializationVerificationNeeded {
/**
* Modify value of local `Replicator` and replicate with given `writeConsistency`.
*
* The current value for the `key` is passed to the `modify` function.
* If there is no current data value for the `key` the `initial` value will be
* passed to the `modify` function.
*/
def this(
key: Key[A], initial: A, writeConsistency: WriteConsistency, replyTo: ActorRef[UpdateResponse[A]], modify: JFunction[A, A]) =
this(key, writeConsistency, replyTo, Optional.empty[Any])(
Update.modifyWithInitial(initial, data modify.apply(data)))
/**
* Java API: Modify value of local `Replicator` and replicate with given `writeConsistency`.
*
* The current value for the `key` is passed to the `modify` function.
* If there is no current data value for the `key` the `initial` value will be
* passed to the `modify` function.
*
* The optional `request` context is included in the reply messages. This is a convenient
* way to pass contextual information (e.g. original sender) without having to use `ask`
* or local correlation data structures.
*/
def this(
key: Key[A], initial: A, writeConsistency: WriteConsistency, replyTo: ActorRef[UpdateResponse[A]],
request: Optional[Any], modify: JFunction[A, A]) =
this(key, writeConsistency, replyTo, request)(Update.modifyWithInitial(initial, data modify.apply(data)))
}
sealed abstract class UpdateResponse[A <: ReplicatedData] extends NoSerializationVerificationNeeded {
def key: Key[A]
def request: Optional[Any]
def getRequest: Optional[Any] = request
}
final case class UpdateSuccess[A <: ReplicatedData](key: Key[A], request: Optional[Any])
extends UpdateResponse[A] with DeadLetterSuppression
sealed abstract class UpdateFailure[A <: ReplicatedData] extends UpdateResponse[A]
/**
* The direct replication of the [[Update]] could not be fulfill according to
* the given [[WriteConsistency consistency level]] and
* [[WriteConsistency#timeout timeout]].
*
* The `Update` was still performed locally and possibly replicated to some nodes.
* It will eventually be disseminated to other replicas, unless the local replica
* crashes before it has been able to communicate with other replicas.
*/
final case class UpdateTimeout[A <: ReplicatedData](key: Key[A], request: Optional[Any]) extends UpdateFailure[A]
/**
* If the `modify` function of the [[Update]] throws an exception the reply message
* will be this `ModifyFailure` message. The original exception is included as `cause`.
*/
final case class ModifyFailure[A <: ReplicatedData](key: Key[A], errorMessage: String, cause: Throwable, request: Optional[Any])
extends UpdateFailure[A] {
override def toString: String = s"ModifyFailure [$key]: $errorMessage"
}
/**
* The local store or direct replication of the [[Update]] could not be fulfill according to
* the given [[WriteConsistency consistency level]] due to durable store errors. This is
* only used for entries that have been configured to be durable.
*
* The `Update` was still performed in memory locally and possibly replicated to some nodes,
* but it might not have been written to durable storage.
* It will eventually be disseminated to other replicas, unless the local replica
* crashes before it has been able to communicate with other replicas.
*/
final case class StoreFailure[A <: ReplicatedData](key: Key[A], request: Optional[Any])
extends UpdateFailure[A] with DeleteResponse[A] {
override def getRequest: Optional[Any] = request
}
sealed trait DeleteResponse[A <: ReplicatedData] extends NoSerializationVerificationNeeded {
def key: Key[A]
def request: Optional[Any]
def getRequest: Optional[Any] = request
}
}

View file

@ -0,0 +1,25 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster.ddata.javadsl
import akka.cluster.{ ddata dd }
import akka.typed.ActorSystem
import akka.typed.scaladsl.adapter._
import com.typesafe.config.Config
object ReplicatorSettings {
/**
* Create settings from the default configuration
* `akka.cluster.distributed-data`.
*/
def apply(system: ActorSystem[_]): dd.ReplicatorSettings =
dd.ReplicatorSettings(system.toUntyped)
/**
* Create settings from a configuration with the same layout as
* the default configuration `akka.cluster.distributed-data`.
*/
def apply(config: Config): dd.ReplicatorSettings =
dd.ReplicatorSettings(config)
}

View file

@ -29,7 +29,7 @@ object Replicator {
type WriteMajority = dd.Replicator.WriteMajority
type WriteAll = dd.Replicator.WriteAll
sealed trait Command[A <: ReplicatedData] {
trait Command[A <: ReplicatedData] {
def key: Key[A]
}