WIP ddata api for Akka Typed

This commit is contained in:
Patrik Nordwall 2017-09-18 12:16:44 +02:00
parent 9bd6ca4c8b
commit 341d4e5ba9
6 changed files with 279 additions and 1 deletions

View file

@ -0,0 +1,99 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster.ddata.scaladsl
import scala.concurrent.duration._
import akka.cluster.Cluster
import akka.cluster.ddata.GCounter
import akka.cluster.ddata.GCounterKey
import akka.cluster.ddata.ReplicatedData
import akka.typed.ActorRef
import akka.typed.ActorSystem
import akka.typed.Behavior
import akka.typed.TypedSpec
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.AskPattern._
import akka.typed.scaladsl.adapter._
import akka.typed.testkit.TestKitSettings
import akka.typed.testkit.scaladsl._
import com.typesafe.config.ConfigFactory
object ReplicatorSpec {
val config = ConfigFactory.parseString("""
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
""")
sealed trait ClientCommand
final case object Increment extends ClientCommand
final case class GetValue(replyTo: ActorRef[Int]) extends ClientCommand
private sealed trait InternalMsg extends ClientCommand
private case class InternalUpdateResponse[A <: ReplicatedData](rsp: Replicator.UpdateResponse[A]) extends InternalMsg
private case class InternalGetResponse[A <: ReplicatedData](rsp: Replicator.GetResponse[A]) extends InternalMsg
val Key = GCounterKey("counter")
def client(replicator: ActorRef[Replicator.Command[_]])(implicit cluster: Cluster): Behavior[ClientCommand] =
Actor.deferred[ClientCommand] { ctx
val updateResponseAdapter: ActorRef[Replicator.UpdateResponse[GCounter]] =
ctx.spawnAdapter(InternalUpdateResponse.apply)
val getResponseAdapter: ActorRef[Replicator.GetResponse[GCounter]] =
ctx.spawnAdapter(InternalGetResponse.apply)
Actor.immutable[ClientCommand] { (ctx, msg)
msg match {
case Increment
replicator ! Replicator.Update(Key, GCounter.empty, Replicator.WriteLocal)(_ + 1)(updateResponseAdapter)
Actor.same
case GetValue(replyTo)
replicator ! Replicator.Get(Key, Replicator.ReadLocal, Some(replyTo))(getResponseAdapter)
Actor.same
case internal: InternalMsg internal match {
case InternalUpdateResponse(_) Actor.same // ok
case InternalGetResponse(rsp @ Replicator.GetSuccess(Key, Some(replyTo: ActorRef[Int] @unchecked)))
val value = rsp.get(Key).value.toInt
replyTo ! value
Actor.same
case InternalGetResponse(rsp)
Actor.unhandled // not dealing with failures
}
}
}
}
}
class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) {
import ReplicatorSpec._
trait RealTests extends StartSupport {
implicit def system: ActorSystem[TypedSpec.Command]
implicit val testSettings = TestKitSettings(system)
val settings = ReplicatorSettings(system)
implicit val cluster = Cluster(system.toUntyped)
def `API prototype`(): Unit = {
val replicator = start(Replicator.behavior(settings))
val c = start(client(replicator))
val probe = TestProbe[Int]
c ! Increment
c ! GetValue(probe.ref)
probe.expectMsg(1)
}
}
object `A ReplicatorBehavior (real, adapted)` extends RealTests with AdaptedSystem
}

View file

@ -0,0 +1,44 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster.ddata.internal
import akka.typed.cluster.ddata.scaladsl.Replicator
import akka.typed.cluster.ddata.scaladsl.ReplicatorSettings
import akka.annotation.InternalApi
import akka.typed.Behavior
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.adapter._
import akka.cluster.{ ddata dd }
/**
* INTERNAL API
*/
@InternalApi private[akka] object ReplicatorBehavior {
import Replicator._
def behavior(settings: ReplicatorSettings): Behavior[Command[_]] = {
val untypedReplicatorProps = dd.Replicator.props(settings)
Actor.deferred { ctx
// FIXME perhaps add supervisor for restarting
val untypedReplicator = ctx.actorOf(untypedReplicatorProps, name = "underlying")
Actor.immutable[Command[_]] { (ctx, msg)
msg match {
case cmd: Replicator.Get[_]
untypedReplicator.tell(
dd.Replicator.Get(cmd.key, cmd.consistency, cmd.request),
sender = cmd.replyTo.toUntyped)
Actor.same
case cmd: Replicator.Update[_]
untypedReplicator.tell(
dd.Replicator.Update(cmd.key, cmd.writeConsistency, cmd.request)(cmd.modify),
sender = cmd.replyTo.toUntyped)
Actor.same
}
}
}
}
}

View file

@ -0,0 +1,100 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster.ddata.scaladsl
import akka.actor.NoSerializationVerificationNeeded
import akka.cluster.{ ddata dd }
import akka.cluster.ddata.Key
import akka.cluster.ddata.ReplicatedData
import akka.typed.ActorRef
import akka.typed.Behavior
import akka.typed.cluster.ddata.internal.ReplicatorBehavior
object Replicator {
def behavior(settings: ReplicatorSettings): Behavior[Command[_]] =
ReplicatorBehavior.behavior(settings)
type ReadConsistency = dd.Replicator.ReadConsistency
val ReadLocal = dd.Replicator.ReadLocal
type ReadFrom = dd.Replicator.ReadFrom
type ReadMajority = dd.Replicator.ReadMajority
type ReadAll = dd.Replicator.ReadAll
type WriteConsistency = dd.Replicator.WriteConsistency
val WriteLocal = dd.Replicator.WriteLocal
type WriteTo = dd.Replicator.WriteTo
type WriteMajority = dd.Replicator.WriteMajority
type WriteAll = dd.Replicator.WriteAll
sealed trait Command[A <: ReplicatedData] {
def key: Key[A]
}
final case class Get[A <: ReplicatedData](key: Key[A], consistency: ReadConsistency, request: Option[Any] = None)(val replyTo: ActorRef[GetResponse[A]])
extends Command[A] {
}
type GetResponse[A <: ReplicatedData] = dd.Replicator.GetResponse[A]
object GetSuccess {
def unapply[A <: ReplicatedData](rsp: GetSuccess[A]): Option[(Key[A], Option[Any])] = Some((rsp.key, rsp.request))
}
type GetSuccess[A <: ReplicatedData] = dd.Replicator.GetSuccess[A]
type NotFound[A <: ReplicatedData] = dd.Replicator.NotFound[A]
type GetFailure[A <: ReplicatedData] = dd.Replicator.GetFailure[A]
object Update {
/**
* Modify value of local `Replicator` and replicate with given `writeConsistency`.
*
* The current value for the `key` is passed to the `modify` function.
* If there is no current data value for the `key` the `initial` value will be
* passed to the `modify` function.
*
* The optional `request` context is included in the reply messages. This is a convenient
* way to pass contextual information (e.g. original sender) without having to use `ask`
* or local correlation data structures.
*/
def apply[A <: ReplicatedData](
key: Key[A], initial: A, writeConsistency: WriteConsistency,
request: Option[Any] = None)(modify: A A)(replyTo: ActorRef[UpdateResponse[A]]): Update[A] =
Update(key, writeConsistency, request)(modifyWithInitial(initial, modify))(replyTo)
private def modifyWithInitial[A <: ReplicatedData](initial: A, modify: A A): Option[A] A = {
case Some(data) modify(data)
case None modify(initial)
}
}
/**
* Send this message to the local `Replicator` to update a data value for the
* given `key`. The `Replicator` will reply with one of the [[UpdateResponse]] messages.
*
* Note that the [[Replicator.Update$ companion]] object provides `apply` functions for convenient
* construction of this message.
*
* The current data value for the `key` is passed as parameter to the `modify` function.
* It is `None` if there is no value for the `key`, and otherwise `Some(data)`. The function
* is supposed to return the new value of the data, which will then be replicated according to
* the given `writeConsistency`.
*
* The `modify` function is called by the `Replicator` actor and must therefore be a pure
* function that only uses the data parameter and stable fields from enclosing scope. It must
* for example not access `sender()` reference of an enclosing actor.
*/
final case class Update[A <: ReplicatedData](key: Key[A], writeConsistency: WriteConsistency,
request: Option[Any])(val modify: Option[A] A)(val replyTo: ActorRef[UpdateResponse[A]])
extends Command[A] with NoSerializationVerificationNeeded {
}
type UpdateResponse[A <: ReplicatedData] = dd.Replicator.UpdateResponse[A]
type UpdateSuccess[A <: ReplicatedData] = dd.Replicator.UpdateSuccess[A]
type UpdateFailure[A <: ReplicatedData] = dd.Replicator.UpdateFailure[A]
type UpdateTimeout[A <: ReplicatedData] = dd.Replicator.UpdateTimeout[A]
type ModifyFailure[A <: ReplicatedData] = dd.Replicator.ModifyFailure[A]
type StoreFailure[A <: ReplicatedData] = dd.Replicator.StoreFailure[A]
}

View file

@ -0,0 +1,25 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster.ddata.scaladsl
import akka.cluster.{ ddata dd }
import akka.typed.ActorSystem
import akka.typed.scaladsl.adapter._
import com.typesafe.config.Config
object ReplicatorSettings {
/**
* Create settings from the default configuration
* `akka.cluster.distributed-data`.
*/
def apply(system: ActorSystem[_]): ReplicatorSettings =
dd.ReplicatorSettings(system.toUntyped)
/**
* Create settings from a configuration with the same layout as
* the default configuration `akka.cluster.distributed-data`.
*/
def apply(config: Config): ReplicatorSettings =
dd.ReplicatorSettings(config)
}

View file

@ -0,0 +1,10 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster.ddata
import akka.cluster.{ ddata dd }
package object scaladsl {
type ReplicatorSettings = dd.ReplicatorSettings
}

View file

@ -158,7 +158,7 @@ lazy val streamTestsTck = akkaModule("akka-stream-tests-tck")
.dependsOn(streamTestkit % "test->test", stream)
lazy val typed = akkaModule("akka-typed")
.dependsOn(testkit % "compile->compile;test->test", cluster % "compile->compile;test->test")
.dependsOn(testkit % "compile->compile;test->test", cluster % "compile->compile;test->test", distributedData)
lazy val typedTests = akkaModule("akka-typed-tests")
.dependsOn(typed, typedTestkit % "compile->compile;test->test")