diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/ddata/scaladsl/ReplicatorSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/ddata/scaladsl/ReplicatorSpec.scala new file mode 100644 index 0000000000..67dff73e90 --- /dev/null +++ b/akka-typed-tests/src/test/scala/akka/typed/cluster/ddata/scaladsl/ReplicatorSpec.scala @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.cluster.ddata.scaladsl + +import scala.concurrent.duration._ + +import akka.cluster.Cluster + +import akka.cluster.ddata.GCounter +import akka.cluster.ddata.GCounterKey +import akka.cluster.ddata.ReplicatedData +import akka.typed.ActorRef +import akka.typed.ActorSystem +import akka.typed.Behavior +import akka.typed.TypedSpec +import akka.typed.scaladsl.Actor +import akka.typed.scaladsl.AskPattern._ +import akka.typed.scaladsl.adapter._ +import akka.typed.testkit.TestKitSettings +import akka.typed.testkit.scaladsl._ +import com.typesafe.config.ConfigFactory + +object ReplicatorSpec { + + val config = ConfigFactory.parseString(""" + akka.actor.provider = "cluster" + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + """) + + sealed trait ClientCommand + final case object Increment extends ClientCommand + final case class GetValue(replyTo: ActorRef[Int]) extends ClientCommand + private sealed trait InternalMsg extends ClientCommand + private case class InternalUpdateResponse[A <: ReplicatedData](rsp: Replicator.UpdateResponse[A]) extends InternalMsg + private case class InternalGetResponse[A <: ReplicatedData](rsp: Replicator.GetResponse[A]) extends InternalMsg + + val Key = GCounterKey("counter") + + def client(replicator: ActorRef[Replicator.Command[_]])(implicit cluster: Cluster): Behavior[ClientCommand] = + Actor.deferred[ClientCommand] { ctx ⇒ + val updateResponseAdapter: ActorRef[Replicator.UpdateResponse[GCounter]] = + ctx.spawnAdapter(InternalUpdateResponse.apply) + + val getResponseAdapter: ActorRef[Replicator.GetResponse[GCounter]] = + ctx.spawnAdapter(InternalGetResponse.apply) + + Actor.immutable[ClientCommand] { (ctx, msg) ⇒ + msg match { + case Increment ⇒ + replicator ! Replicator.Update(Key, GCounter.empty, Replicator.WriteLocal)(_ + 1)(updateResponseAdapter) + Actor.same + + case GetValue(replyTo) ⇒ + replicator ! Replicator.Get(Key, Replicator.ReadLocal, Some(replyTo))(getResponseAdapter) + Actor.same + + case internal: InternalMsg ⇒ internal match { + case InternalUpdateResponse(_) ⇒ Actor.same // ok + + case InternalGetResponse(rsp @ Replicator.GetSuccess(Key, Some(replyTo: ActorRef[Int] @unchecked))) ⇒ + val value = rsp.get(Key).value.toInt + replyTo ! value + Actor.same + + case InternalGetResponse(rsp) ⇒ + Actor.unhandled // not dealing with failures + } + } + } + } +} + +class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) { + import ReplicatorSpec._ + + trait RealTests extends StartSupport { + implicit def system: ActorSystem[TypedSpec.Command] + implicit val testSettings = TestKitSettings(system) + val settings = ReplicatorSettings(system) + implicit val cluster = Cluster(system.toUntyped) + + def `API prototype`(): Unit = { + + val replicator = start(Replicator.behavior(settings)) + + val c = start(client(replicator)) + + val probe = TestProbe[Int] + c ! Increment + c ! GetValue(probe.ref) + probe.expectMsg(1) + } + + } + + object `A ReplicatorBehavior (real, adapted)` extends RealTests with AdaptedSystem +} diff --git a/akka-typed/src/main/scala/akka/typed/cluster/ddata/internal/ReplicatorBehavior.scala b/akka-typed/src/main/scala/akka/typed/cluster/ddata/internal/ReplicatorBehavior.scala new file mode 100644 index 0000000000..527dcac0fa --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/cluster/ddata/internal/ReplicatorBehavior.scala @@ -0,0 +1,44 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.cluster.ddata.internal + +import akka.typed.cluster.ddata.scaladsl.Replicator +import akka.typed.cluster.ddata.scaladsl.ReplicatorSettings +import akka.annotation.InternalApi +import akka.typed.Behavior +import akka.typed.scaladsl.Actor +import akka.typed.scaladsl.adapter._ +import akka.cluster.{ ddata ⇒ dd } + +/** + * INTERNAL API + */ +@InternalApi private[akka] object ReplicatorBehavior { + import Replicator._ + def behavior(settings: ReplicatorSettings): Behavior[Command[_]] = { + val untypedReplicatorProps = dd.Replicator.props(settings) + + Actor.deferred { ctx ⇒ + // FIXME perhaps add supervisor for restarting + val untypedReplicator = ctx.actorOf(untypedReplicatorProps, name = "underlying") + + Actor.immutable[Command[_]] { (ctx, msg) ⇒ + msg match { + case cmd: Replicator.Get[_] ⇒ + untypedReplicator.tell( + dd.Replicator.Get(cmd.key, cmd.consistency, cmd.request), + sender = cmd.replyTo.toUntyped) + Actor.same + + case cmd: Replicator.Update[_] ⇒ + untypedReplicator.tell( + dd.Replicator.Update(cmd.key, cmd.writeConsistency, cmd.request)(cmd.modify), + sender = cmd.replyTo.toUntyped) + Actor.same + } + } + + } + } +} diff --git a/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/Replicator.scala b/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/Replicator.scala new file mode 100644 index 0000000000..d7b5650753 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/Replicator.scala @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.typed.cluster.ddata.scaladsl + +import akka.actor.NoSerializationVerificationNeeded + +import akka.cluster.{ ddata ⇒ dd } +import akka.cluster.ddata.Key +import akka.cluster.ddata.ReplicatedData +import akka.typed.ActorRef +import akka.typed.Behavior +import akka.typed.cluster.ddata.internal.ReplicatorBehavior + +object Replicator { + + def behavior(settings: ReplicatorSettings): Behavior[Command[_]] = + ReplicatorBehavior.behavior(settings) + + type ReadConsistency = dd.Replicator.ReadConsistency + val ReadLocal = dd.Replicator.ReadLocal + type ReadFrom = dd.Replicator.ReadFrom + type ReadMajority = dd.Replicator.ReadMajority + type ReadAll = dd.Replicator.ReadAll + + type WriteConsistency = dd.Replicator.WriteConsistency + val WriteLocal = dd.Replicator.WriteLocal + type WriteTo = dd.Replicator.WriteTo + type WriteMajority = dd.Replicator.WriteMajority + type WriteAll = dd.Replicator.WriteAll + + sealed trait Command[A <: ReplicatedData] { + def key: Key[A] + } + + final case class Get[A <: ReplicatedData](key: Key[A], consistency: ReadConsistency, request: Option[Any] = None)(val replyTo: ActorRef[GetResponse[A]]) + extends Command[A] { + + } + + type GetResponse[A <: ReplicatedData] = dd.Replicator.GetResponse[A] + object GetSuccess { + def unapply[A <: ReplicatedData](rsp: GetSuccess[A]): Option[(Key[A], Option[Any])] = Some((rsp.key, rsp.request)) + } + type GetSuccess[A <: ReplicatedData] = dd.Replicator.GetSuccess[A] + type NotFound[A <: ReplicatedData] = dd.Replicator.NotFound[A] + type GetFailure[A <: ReplicatedData] = dd.Replicator.GetFailure[A] + + object Update { + + /** + * Modify value of local `Replicator` and replicate with given `writeConsistency`. + * + * The current value for the `key` is passed to the `modify` function. + * If there is no current data value for the `key` the `initial` value will be + * passed to the `modify` function. + * + * The optional `request` context is included in the reply messages. This is a convenient + * way to pass contextual information (e.g. original sender) without having to use `ask` + * or local correlation data structures. + */ + def apply[A <: ReplicatedData]( + key: Key[A], initial: A, writeConsistency: WriteConsistency, + request: Option[Any] = None)(modify: A ⇒ A)(replyTo: ActorRef[UpdateResponse[A]]): Update[A] = + Update(key, writeConsistency, request)(modifyWithInitial(initial, modify))(replyTo) + + private def modifyWithInitial[A <: ReplicatedData](initial: A, modify: A ⇒ A): Option[A] ⇒ A = { + case Some(data) ⇒ modify(data) + case None ⇒ modify(initial) + } + } + /** + * Send this message to the local `Replicator` to update a data value for the + * given `key`. The `Replicator` will reply with one of the [[UpdateResponse]] messages. + * + * Note that the [[Replicator.Update$ companion]] object provides `apply` functions for convenient + * construction of this message. + * + * The current data value for the `key` is passed as parameter to the `modify` function. + * It is `None` if there is no value for the `key`, and otherwise `Some(data)`. The function + * is supposed to return the new value of the data, which will then be replicated according to + * the given `writeConsistency`. + * + * The `modify` function is called by the `Replicator` actor and must therefore be a pure + * function that only uses the data parameter and stable fields from enclosing scope. It must + * for example not access `sender()` reference of an enclosing actor. + */ + final case class Update[A <: ReplicatedData](key: Key[A], writeConsistency: WriteConsistency, + request: Option[Any])(val modify: Option[A] ⇒ A)(val replyTo: ActorRef[UpdateResponse[A]]) + extends Command[A] with NoSerializationVerificationNeeded { + } + + type UpdateResponse[A <: ReplicatedData] = dd.Replicator.UpdateResponse[A] + type UpdateSuccess[A <: ReplicatedData] = dd.Replicator.UpdateSuccess[A] + type UpdateFailure[A <: ReplicatedData] = dd.Replicator.UpdateFailure[A] + type UpdateTimeout[A <: ReplicatedData] = dd.Replicator.UpdateTimeout[A] + type ModifyFailure[A <: ReplicatedData] = dd.Replicator.ModifyFailure[A] + type StoreFailure[A <: ReplicatedData] = dd.Replicator.StoreFailure[A] + +} diff --git a/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/ReplicatorSettings.scala b/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/ReplicatorSettings.scala new file mode 100644 index 0000000000..d8f0317702 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/ReplicatorSettings.scala @@ -0,0 +1,25 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.cluster.ddata.scaladsl + +import akka.cluster.{ ddata ⇒ dd } +import akka.typed.ActorSystem +import akka.typed.scaladsl.adapter._ +import com.typesafe.config.Config + +object ReplicatorSettings { + /** + * Create settings from the default configuration + * `akka.cluster.distributed-data`. + */ + def apply(system: ActorSystem[_]): ReplicatorSettings = + dd.ReplicatorSettings(system.toUntyped) + + /** + * Create settings from a configuration with the same layout as + * the default configuration `akka.cluster.distributed-data`. + */ + def apply(config: Config): ReplicatorSettings = + dd.ReplicatorSettings(config) +} diff --git a/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/package.scala b/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/package.scala new file mode 100644 index 0000000000..e144131038 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/package.scala @@ -0,0 +1,10 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.cluster.ddata + +import akka.cluster.{ ddata ⇒ dd } + +package object scaladsl { + type ReplicatorSettings = dd.ReplicatorSettings +} diff --git a/build.sbt b/build.sbt index 5db6120695..87128366e0 100644 --- a/build.sbt +++ b/build.sbt @@ -158,7 +158,7 @@ lazy val streamTestsTck = akkaModule("akka-stream-tests-tck") .dependsOn(streamTestkit % "test->test", stream) lazy val typed = akkaModule("akka-typed") - .dependsOn(testkit % "compile->compile;test->test", cluster % "compile->compile;test->test") + .dependsOn(testkit % "compile->compile;test->test", cluster % "compile->compile;test->test", distributedData) lazy val typedTests = akkaModule("akka-typed-tests") .dependsOn(typed, typedTestkit % "compile->compile;test->test")