add remaining commands

This commit is contained in:
Patrik Nordwall 2017-09-20 11:35:02 +02:00
parent eee449ccb3
commit 9eb8d3f853
7 changed files with 226 additions and 51 deletions

View file

@ -81,7 +81,7 @@ public class ReplicatorTest extends JUnitSuite {
static final Key<GCounter> Key = GCounterKey.create("counter");
static class Client extends MutableBehavior<ClientCommand> {
private final ActorRef<Replicator.Command<?>> replicator;
private final ActorRef<Replicator.Command> replicator;
private final Cluster node;
final ActorRef<Replicator.UpdateResponse<GCounter>> updateResponseAdapter;
final ActorRef<Replicator.GetResponse<GCounter>> getResponseAdapter;
@ -89,7 +89,7 @@ public class ReplicatorTest extends JUnitSuite {
private int cachedValue = 0;
public Client(ActorRef<Command<?>> replicator, Cluster node, ActorContext<ClientCommand> ctx) {
public Client(ActorRef<Command> replicator, Cluster node, ActorContext<ClientCommand> ctx) {
this.replicator = replicator;
this.node = node;
@ -102,7 +102,7 @@ public class ReplicatorTest extends JUnitSuite {
replicator.tell(new Replicator.Subscribe<>(Key, changedAdapter));
}
public static Behavior<ClientCommand> create(ActorRef<Command<?>> replicator, Cluster node) {
public static Behavior<ClientCommand> create(ActorRef<Command> replicator, Cluster node) {
return Actor.mutable(ctx -> new Client(replicator, node, ctx));
}
@ -168,7 +168,7 @@ public class ReplicatorTest extends JUnitSuite {
public void shouldHaveApiForUpdateAndGet() {
TestKit probe = new TestKit(system);
akka.cluster.ddata.ReplicatorSettings settings = ReplicatorSettings.create(typedSystem());
ActorRef<Replicator.Command<?>> replicator =
ActorRef<Replicator.Command> replicator =
Adapter.spawnAnonymous(system, Replicator.behavior(settings));
ActorRef<ClientCommand> client =
Adapter.spawnAnonymous(system, Client.create(replicator, Cluster.get(system)));
@ -182,7 +182,7 @@ public class ReplicatorTest extends JUnitSuite {
public void shouldHaveApiForSubscribe() {
TestKit probe = new TestKit(system);
akka.cluster.ddata.ReplicatorSettings settings = ReplicatorSettings.create(typedSystem());
ActorRef<Replicator.Command<?>> replicator =
ActorRef<Replicator.Command> replicator =
Adapter.spawnAnonymous(system, Replicator.behavior(settings));
ActorRef<ClientCommand> client =
Adapter.spawnAnonymous(system, Client.create(replicator, Cluster.get(system)));

View file

@ -41,7 +41,7 @@ object ReplicatorSpec {
val Key = GCounterKey("counter")
def client(replicator: ActorRef[Replicator.Command[_]])(implicit cluster: Cluster): Behavior[ClientCommand] =
def client(replicator: ActorRef[Replicator.Command])(implicit cluster: Cluster): Behavior[ClientCommand] =
Actor.deferred[ClientCommand] { ctx
val updateResponseAdapter: ActorRef[Replicator.UpdateResponse[GCounter]] =
ctx.spawnAdapter(InternalUpdateResponse.apply)
@ -58,15 +58,15 @@ object ReplicatorSpec {
Actor.immutable[ClientCommand] { (ctx, msg)
msg match {
case Increment
replicator ! Replicator.Update(Key, GCounter.empty, Replicator.WriteLocal)(_ + 1)(updateResponseAdapter)
replicator ! Replicator.Update(Key, GCounter.empty, Replicator.WriteLocal, updateResponseAdapter)(_ + 1)
Actor.same
case GetValue(replyTo)
replicator ! Replicator.Get(Key, Replicator.ReadLocal, Some(replyTo))(getResponseAdapter)
replicator ! Replicator.Get(Key, Replicator.ReadLocal, getResponseAdapter, Some(replyTo))
Actor.same
case GetCachedValue(replyTo)
replicator ! Replicator.Get(Key, Replicator.ReadLocal, Some(replyTo))(getResponseAdapter)
replicator ! Replicator.Get(Key, Replicator.ReadLocal, getResponseAdapter, Some(replyTo))
Actor.same
case internal: InternalMsg internal match {

View file

@ -6,6 +6,7 @@ package akka.typed.cluster.ddata.internal
import scala.compat.java8.OptionConverters._
import scala.concurrent.duration._
import scala.concurrent.duration.Duration
import scala.concurrent.Future
import akka.annotation.InternalApi
import akka.cluster.{ ddata dd }
@ -27,14 +28,12 @@ import akka.typed.Terminated
import akka.typed.cluster.ddata.scaladsl.{ Replicator SReplicator }
private case class InternalChanged[A <: ReplicatedData](chg: dd.Replicator.Changed[A], subscriber: ActorRef[JReplicator.Changed[A]])
extends JReplicator.Command[A] {
override def key: Key[A] = chg.key
}
extends JReplicator.Command
val localAskTimeout = 60.seconds // ReadLocal, WriteLocal shouldn't timeout
val additionalAskTimeout = 1.second
def behavior(settings: dd.ReplicatorSettings): Behavior[SReplicator.Command[_]] = {
def behavior(settings: dd.ReplicatorSettings): Behavior[SReplicator.Command] = {
val untypedReplicatorProps = dd.Replicator.props(settings)
Actor.deferred { ctx
@ -42,9 +41,9 @@ import akka.typed.Terminated
val untypedReplicator = ctx.actorOf(untypedReplicatorProps, name = "underlying")
def withState(
subscribeAdapters: Map[ActorRef[JReplicator.Changed[ReplicatedData]], ActorRef[dd.Replicator.Changed[ReplicatedData]]]): Behavior[SReplicator.Command[_]] = {
subscribeAdapters: Map[ActorRef[JReplicator.Changed[ReplicatedData]], ActorRef[dd.Replicator.Changed[ReplicatedData]]]): Behavior[SReplicator.Command] = {
def stopSubscribeAdapter(subscriber: ActorRef[JReplicator.Changed[ReplicatedData]]): Behavior[SReplicator.Command[_]] = {
def stopSubscribeAdapter(subscriber: ActorRef[JReplicator.Changed[ReplicatedData]]): Behavior[SReplicator.Command] = {
subscribeAdapters.get(subscriber) match {
case Some(adapter)
// will be unsubscribed from untypedReplicator via Terminated
@ -55,7 +54,7 @@ import akka.typed.Terminated
}
}
Actor.immutable[SReplicator.Command[_]] { (ctx, msg)
Actor.immutable[SReplicator.Command] { (ctx, msg)
msg match {
case cmd: SReplicator.Get[_]
untypedReplicator.tell(
@ -63,12 +62,6 @@ import akka.typed.Terminated
sender = cmd.replyTo.toUntyped)
Actor.same
case cmd: SReplicator.Update[_]
untypedReplicator.tell(
dd.Replicator.Update(cmd.key, cmd.writeConsistency, cmd.request)(cmd.modify),
sender = cmd.replyTo.toUntyped)
Actor.same
case cmd: JReplicator.Get[d]
implicit val timeout = Timeout(cmd.consistency.timeout match {
case Duration.Zero localAskTimeout
@ -87,6 +80,12 @@ import akka.typed.Terminated
reply.foreach { cmd.replyTo ! _ }
Actor.same
case cmd: SReplicator.Update[_]
untypedReplicator.tell(
dd.Replicator.Update(cmd.key, cmd.writeConsistency, cmd.request)(cmd.modify),
sender = cmd.replyTo.toUntyped)
Actor.same
case cmd: JReplicator.Update[d]
implicit val timeout = Timeout(cmd.writeConsistency.timeout match {
case Duration.Zero localAskTimeout
@ -136,6 +135,48 @@ import akka.typed.Terminated
case cmd: JReplicator.Unsubscribe[ReplicatedData] @unchecked
stopSubscribeAdapter(cmd.subscriber)
case cmd: SReplicator.Delete[_]
untypedReplicator.tell(
dd.Replicator.Delete(cmd.key, cmd.consistency, cmd.request),
sender = cmd.replyTo.toUntyped)
Actor.same
case cmd: JReplicator.Delete[d]
implicit val timeout = Timeout(cmd.consistency.timeout match {
case Duration.Zero localAskTimeout
case t t + additionalAskTimeout
})
import ctx.executionContext
val reply =
(untypedReplicator ? dd.Replicator.Delete(cmd.key, cmd.consistency.toUntyped, cmd.request.asScala))
.mapTo[dd.Replicator.DeleteResponse[d]].map {
case rsp: dd.Replicator.DeleteSuccess[d] JReplicator.DeleteSuccess(rsp.key, rsp.request.asJava)
case rsp: dd.Replicator.ReplicationDeleteFailure[d] JReplicator.ReplicationDeleteFailure(rsp.key, rsp.request.asJava)
case rsp: dd.Replicator.DataDeleted[d] JReplicator.DataDeleted(rsp.key, rsp.request.asJava)
case rsp: dd.Replicator.StoreFailure[d] JReplicator.StoreFailure(rsp.key, rsp.request.asJava)
}.recover {
case _ JReplicator.ReplicationDeleteFailure(cmd.key, cmd.request)
}
reply.foreach { cmd.replyTo ! _ }
Actor.same
case SReplicator.GetReplicaCount(replyTo)
untypedReplicator.tell(dd.Replicator.GetReplicaCount, sender = replyTo.toUntyped)
Actor.same
case JReplicator.GetReplicaCount(replyTo)
implicit val timeout = Timeout(localAskTimeout)
import ctx.executionContext
val reply =
(untypedReplicator ? dd.Replicator.GetReplicaCount)
.mapTo[dd.Replicator.ReplicaCount].map(rsp JReplicator.ReplicaCount(rsp.n))
reply.foreach { replyTo ! _ }
Actor.same
case SReplicator.FlushChanges | JReplicator.FlushChanges
untypedReplicator.tell(dd.Replicator.FlushChanges, sender = akka.actor.ActorRef.noSender)
Actor.same
}
}
.onSignal {

View file

@ -20,16 +20,18 @@ import java.util.Optional
import akka.actor.DeadLetterSuppression
import akka.annotation.InternalApi
import akka.annotation.DoNotInherit
import scala.util.control.NoStackTrace
/**
* @see [[akka.cluster.ddata.Replicator]].
*/
object Replicator {
import dd.Replicator.DefaultMajorityMinCap
def behavior(settings: dd.ReplicatorSettings): Behavior[Command[_]] =
ReplicatorBehavior.behavior(settings).narrow[Command[_]]
def behavior(settings: dd.ReplicatorSettings): Behavior[Command] =
ReplicatorBehavior.behavior(settings).narrow[Command]
@DoNotInherit trait Command[A <: ReplicatedData] extends scaladsl.Replicator.Command[A] {
def key: Key[A]
}
@DoNotInherit trait Command extends scaladsl.Replicator.Command
sealed trait ReadConsistency {
def timeout: FiniteDuration
@ -108,7 +110,7 @@ object Replicator {
* or maintain local correlation data structures.
*/
final case class Get[A <: ReplicatedData](key: Key[A], consistency: ReadConsistency, replyTo: ActorRef[GetResponse[A]], request: Optional[Any])
extends Command[A] {
extends Command {
def this(key: Key[A], consistency: ReadConsistency, replyTo: ActorRef[GetResponse[A]]) =
this(key, consistency, replyTo, Optional.empty[Any])
@ -127,7 +129,6 @@ object Replicator {
/**
* The data value, with correct type.
* Scala pattern matching cannot infer the type from the `key` parameter.
*/
def get[T <: ReplicatedData](key: Key[T]): T = {
require(key == this.key, "wrong key used, must use contained key")
@ -170,7 +171,7 @@ object Replicator {
*/
final case class Update[A <: ReplicatedData] private (key: Key[A], writeConsistency: WriteConsistency,
replyTo: ActorRef[UpdateResponse[A]], request: Optional[Any])(val modify: Option[A] A)
extends Command[A] with NoSerializationVerificationNeeded {
extends Command with NoSerializationVerificationNeeded {
/**
* Modify value of local `Replicator` and replicate with given `writeConsistency`.
@ -246,12 +247,6 @@ object Replicator {
override def getRequest: Optional[Any] = request
}
sealed trait DeleteResponse[A <: ReplicatedData] extends NoSerializationVerificationNeeded {
def key: Key[A]
def request: Optional[Any]
def getRequest: Optional[Any] = request
}
/**
* Register a subscriber that will be notified with a [[Changed]] message
* when the value of the given `key` is changed. Current value is also
@ -266,13 +261,13 @@ object Replicator {
* If the key is deleted the subscriber is notified with a [[Deleted]]
* message.
*/
final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command[A]
final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command
/**
* Unregister a subscriber.
*
* @see [[Replicator.Subscribe]]
*/
final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command[A]
final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command
/**
* The data value is retrieved with [[#get]] using the typed key.
*
@ -294,4 +289,54 @@ object Replicator {
def dataValue: A = data
}
/**
* Send this message to the local `Replicator` to delete a data value for the
* given `key`. The `Replicator` will reply with one of the [[DeleteResponse]] messages.
*
* The optional `request` context is included in the reply messages. This is a convenient
* way to pass contextual information (e.g. original sender) without having to use `ask`
* or maintain local correlation data structures.
*/
final case class Delete[A <: ReplicatedData](key: Key[A], consistency: WriteConsistency,
replyTo: ActorRef[DeleteResponse[A]], request: Optional[Any])
extends Command with NoSerializationVerificationNeeded {
def this(key: Key[A], consistency: WriteConsistency, replyTo: ActorRef[DeleteResponse[A]]) =
this(key, consistency, replyTo, Optional.empty())
}
sealed trait DeleteResponse[A <: ReplicatedData] extends NoSerializationVerificationNeeded {
def key: Key[A]
def request: Optional[Any]
def getRequest: Optional[Any] = request
}
final case class DeleteSuccess[A <: ReplicatedData](key: Key[A], request: Optional[Any]) extends DeleteResponse[A]
final case class ReplicationDeleteFailure[A <: ReplicatedData](key: Key[A], request: Optional[Any]) extends DeleteResponse[A]
final case class DataDeleted[A <: ReplicatedData](key: Key[A], request: Optional[Any])
extends RuntimeException with NoStackTrace with DeleteResponse[A] {
override def toString: String = s"DataDeleted [$key]"
}
/**
* Get current number of replicas, including the local replica.
* Will reply to sender with [[ReplicaCount]].
*/
final case class GetReplicaCount(replyTo: ActorRef[ReplicaCount]) extends Command
/**
* Current number of replicas. Reply to `GetReplicaCount`.
*/
final case class ReplicaCount(n: Int)
/**
* INTERNAL API
*/
@InternalApi private[akka] case object FlushChanges extends Command
/**
* The `FlushChanges` instance. Notify subscribers of changes now, otherwise they will be notified periodically
* with the configured `notify-subscribers-interval`.
*/
def flushChanges: Command = FlushChanges
}

View file

@ -12,9 +12,15 @@ import akka.typed.ActorRef
import akka.typed.Behavior
import akka.typed.cluster.ddata.internal.ReplicatorBehavior
/**
* @see [[akka.cluster.ddata.Replicator]].
*/
object Replicator {
def behavior(settings: ReplicatorSettings): Behavior[Command[_]] =
/**
* The `Behavior` for the `Replicator` actor.
*/
def behavior(settings: ReplicatorSettings): Behavior[Command] =
ReplicatorBehavior.behavior(settings)
type ReadConsistency = dd.Replicator.ReadConsistency
@ -29,19 +35,32 @@ object Replicator {
type WriteMajority = dd.Replicator.WriteMajority
type WriteAll = dd.Replicator.WriteAll
trait Command[A <: ReplicatedData] {
def key: Key[A]
}
trait Command
final case class Get[A <: ReplicatedData](key: Key[A], consistency: ReadConsistency, request: Option[Any] = None)(val replyTo: ActorRef[GetResponse[A]])
extends Command[A]
/**
* Send this message to the local `Replicator` to retrieve a data value for the
* given `key`. The `Replicator` will reply with one of the [[GetResponse]] messages.
*
* The optional `request` context is included in the reply messages. This is a convenient
* way to pass contextual information (e.g. original sender) without having to use `ask`
* or maintain local correlation data structures.
*/
final case class Get[A <: ReplicatedData](key: Key[A], consistency: ReadConsistency,
replyTo: ActorRef[GetResponse[A]], request: Option[Any] = None) extends Command
/**
* Reply from `Get`. The data value is retrieved with [[#get]] using the typed key.
*/
type GetResponse[A <: ReplicatedData] = dd.Replicator.GetResponse[A]
object GetSuccess {
def unapply[A <: ReplicatedData](rsp: GetSuccess[A]): Option[(Key[A], Option[Any])] = Some((rsp.key, rsp.request))
}
type GetSuccess[A <: ReplicatedData] = dd.Replicator.GetSuccess[A]
type NotFound[A <: ReplicatedData] = dd.Replicator.NotFound[A]
/**
* The [[Get]] request could not be fulfill according to the given
* [[ReadConsistency consistency level]] and [[ReadConsistency#timeout timeout]].
*/
type GetFailure[A <: ReplicatedData] = dd.Replicator.GetFailure[A]
object Update {
@ -58,15 +77,16 @@ object Replicator {
* or local correlation data structures.
*/
def apply[A <: ReplicatedData](
key: Key[A], initial: A, writeConsistency: WriteConsistency,
request: Option[Any] = None)(modify: A A)(replyTo: ActorRef[UpdateResponse[A]]): Update[A] =
Update(key, writeConsistency, request)(modifyWithInitial(initial, modify))(replyTo)
key: Key[A], initial: A, writeConsistency: WriteConsistency, replyTo: ActorRef[UpdateResponse[A]],
request: Option[Any] = None)(modify: A A): Update[A] =
Update(key, writeConsistency, replyTo, request)(modifyWithInitial(initial, modify))
private def modifyWithInitial[A <: ReplicatedData](initial: A, modify: A A): Option[A] A = {
case Some(data) modify(data)
case None modify(initial)
}
}
/**
* Send this message to the local `Replicator` to update a data value for the
* given `key`. The `Replicator` will reply with one of the [[UpdateResponse]] messages.
@ -84,15 +104,39 @@ object Replicator {
* for example not access `sender()` reference of an enclosing actor.
*/
final case class Update[A <: ReplicatedData](key: Key[A], writeConsistency: WriteConsistency,
request: Option[Any])(val modify: Option[A] A)(val replyTo: ActorRef[UpdateResponse[A]])
extends Command[A] with NoSerializationVerificationNeeded {
replyTo: ActorRef[UpdateResponse[A]],
request: Option[Any])(val modify: Option[A] A)
extends Command with NoSerializationVerificationNeeded {
}
type UpdateResponse[A <: ReplicatedData] = dd.Replicator.UpdateResponse[A]
type UpdateSuccess[A <: ReplicatedData] = dd.Replicator.UpdateSuccess[A]
type UpdateFailure[A <: ReplicatedData] = dd.Replicator.UpdateFailure[A]
/**
* The direct replication of the [[Update]] could not be fulfill according to
* the given [[WriteConsistency consistency level]] and
* [[WriteConsistency#timeout timeout]].
*
* The `Update` was still performed locally and possibly replicated to some nodes.
* It will eventually be disseminated to other replicas, unless the local replica
* crashes before it has been able to communicate with other replicas.
*/
type UpdateTimeout[A <: ReplicatedData] = dd.Replicator.UpdateTimeout[A]
/**
* If the `modify` function of the [[Update]] throws an exception the reply message
* will be this `ModifyFailure` message. The original exception is included as `cause`.
*/
type ModifyFailure[A <: ReplicatedData] = dd.Replicator.ModifyFailure[A]
/**
* The local store or direct replication of the [[Update]] could not be fulfill according to
* the given [[WriteConsistency consistency level]] due to durable store errors. This is
* only used for entries that have been configured to be durable.
*
* The `Update` was still performed in memory locally and possibly replicated to some nodes,
* but it might not have been written to durable storage.
* It will eventually be disseminated to other replicas, unless the local replica
* crashes before it has been able to communicate with other replicas.
*/
type StoreFailure[A <: ReplicatedData] = dd.Replicator.StoreFailure[A]
/**
@ -110,7 +154,7 @@ object Replicator {
* message.
*/
final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]])
extends Command[A]
extends Command
/**
* Unregister a subscriber.
@ -118,11 +162,50 @@ object Replicator {
* @see [[Replicator.Subscribe]]
*/
final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]])
extends Command[A]
extends Command
object Changed {
def unapply[A <: ReplicatedData](chg: Changed[A]): Option[Key[A]] = Some(chg.key)
}
/**
* The data value is retrieved with [[#get]] using the typed key.
*
* @see [[Replicator.Subscribe]]
*/
type Changed[A <: ReplicatedData] = dd.Replicator.Changed[A]
/**
* Send this message to the local `Replicator` to delete a data value for the
* given `key`. The `Replicator` will reply with one of the [[DeleteResponse]] messages.
*
* The optional `request` context is included in the reply messages. This is a convenient
* way to pass contextual information (e.g. original sender) without having to use `ask`
* or maintain local correlation data structures.
*/
final case class Delete[A <: ReplicatedData](key: Key[A], consistency: WriteConsistency,
replyTo: ActorRef[DeleteResponse[A]], request: Option[Any])
extends Command with NoSerializationVerificationNeeded
type DeleteResponse[A <: ReplicatedData] = dd.Replicator.DeleteResponse[A]
type DeleteSuccess[A <: ReplicatedData] = dd.Replicator.DeleteSuccess[A]
type ReplicationDeleteFailure[A <: ReplicatedData] = dd.Replicator.ReplicationDeleteFailure[A]
type DataDeleted[A <: ReplicatedData] = dd.Replicator.DataDeleted[A]
/**
* Get current number of replicas, including the local replica.
* Will reply to sender with [[ReplicaCount]].
*/
final case class GetReplicaCount(replyTo: ActorRef[ReplicaCount]) extends Command
/**
* Current number of replicas. Reply to `GetReplicaCount`.
*/
type ReplicaCount = dd.Replicator.ReplicaCount
/**
* Notify subscribers of changes now, otherwise they will be notified periodically
* with the configured `notify-subscribers-interval`.
*/
object FlushChanges extends Command
}

View file

@ -8,6 +8,9 @@ import akka.typed.ActorSystem
import akka.typed.scaladsl.adapter._
import com.typesafe.config.Config
/**
* @see [[akka.cluster.ddata.ReplicatorSettings]].
*/
object ReplicatorSettings {
/**
* Create settings from the default configuration

View file

@ -6,5 +6,8 @@ package akka.typed.cluster.ddata
import akka.cluster.{ ddata dd }
package object scaladsl {
/**
* @see [[akka.cluster.ddata.ReplicatorSettings]].
*/
type ReplicatorSettings = dd.ReplicatorSettings
}