diff --git a/akka-typed-tests/src/test/java/akka/typed/cluster/ddata/javadsl/ReplicatorTest.java b/akka-typed-tests/src/test/java/akka/typed/cluster/ddata/javadsl/ReplicatorTest.java index 2ed2a6ad55..145cd5e194 100644 --- a/akka-typed-tests/src/test/java/akka/typed/cluster/ddata/javadsl/ReplicatorTest.java +++ b/akka-typed-tests/src/test/java/akka/typed/cluster/ddata/javadsl/ReplicatorTest.java @@ -81,7 +81,7 @@ public class ReplicatorTest extends JUnitSuite { static final Key Key = GCounterKey.create("counter"); static class Client extends MutableBehavior { - private final ActorRef> replicator; + private final ActorRef replicator; private final Cluster node; final ActorRef> updateResponseAdapter; final ActorRef> getResponseAdapter; @@ -89,7 +89,7 @@ public class ReplicatorTest extends JUnitSuite { private int cachedValue = 0; - public Client(ActorRef> replicator, Cluster node, ActorContext ctx) { + public Client(ActorRef replicator, Cluster node, ActorContext ctx) { this.replicator = replicator; this.node = node; @@ -102,7 +102,7 @@ public class ReplicatorTest extends JUnitSuite { replicator.tell(new Replicator.Subscribe<>(Key, changedAdapter)); } - public static Behavior create(ActorRef> replicator, Cluster node) { + public static Behavior create(ActorRef replicator, Cluster node) { return Actor.mutable(ctx -> new Client(replicator, node, ctx)); } @@ -168,7 +168,7 @@ public class ReplicatorTest extends JUnitSuite { public void shouldHaveApiForUpdateAndGet() { TestKit probe = new TestKit(system); akka.cluster.ddata.ReplicatorSettings settings = ReplicatorSettings.create(typedSystem()); - ActorRef> replicator = + ActorRef replicator = Adapter.spawnAnonymous(system, Replicator.behavior(settings)); ActorRef client = Adapter.spawnAnonymous(system, Client.create(replicator, Cluster.get(system))); @@ -182,7 +182,7 @@ public class ReplicatorTest extends JUnitSuite { public void shouldHaveApiForSubscribe() { TestKit probe = new TestKit(system); akka.cluster.ddata.ReplicatorSettings settings = ReplicatorSettings.create(typedSystem()); - ActorRef> replicator = + ActorRef replicator = Adapter.spawnAnonymous(system, Replicator.behavior(settings)); ActorRef client = Adapter.spawnAnonymous(system, Client.create(replicator, Cluster.get(system))); diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/ddata/scaladsl/ReplicatorSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/ddata/scaladsl/ReplicatorSpec.scala index a357dec258..ee5d48ae35 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/cluster/ddata/scaladsl/ReplicatorSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/cluster/ddata/scaladsl/ReplicatorSpec.scala @@ -41,7 +41,7 @@ object ReplicatorSpec { val Key = GCounterKey("counter") - def client(replicator: ActorRef[Replicator.Command[_]])(implicit cluster: Cluster): Behavior[ClientCommand] = + def client(replicator: ActorRef[Replicator.Command])(implicit cluster: Cluster): Behavior[ClientCommand] = Actor.deferred[ClientCommand] { ctx ⇒ val updateResponseAdapter: ActorRef[Replicator.UpdateResponse[GCounter]] = ctx.spawnAdapter(InternalUpdateResponse.apply) @@ -58,15 +58,15 @@ object ReplicatorSpec { Actor.immutable[ClientCommand] { (ctx, msg) ⇒ msg match { case Increment ⇒ - replicator ! Replicator.Update(Key, GCounter.empty, Replicator.WriteLocal)(_ + 1)(updateResponseAdapter) + replicator ! Replicator.Update(Key, GCounter.empty, Replicator.WriteLocal, updateResponseAdapter)(_ + 1) Actor.same case GetValue(replyTo) ⇒ - replicator ! Replicator.Get(Key, Replicator.ReadLocal, Some(replyTo))(getResponseAdapter) + replicator ! Replicator.Get(Key, Replicator.ReadLocal, getResponseAdapter, Some(replyTo)) Actor.same case GetCachedValue(replyTo) ⇒ - replicator ! Replicator.Get(Key, Replicator.ReadLocal, Some(replyTo))(getResponseAdapter) + replicator ! Replicator.Get(Key, Replicator.ReadLocal, getResponseAdapter, Some(replyTo)) Actor.same case internal: InternalMsg ⇒ internal match { diff --git a/akka-typed/src/main/scala/akka/typed/cluster/ddata/internal/ReplicatorBehavior.scala b/akka-typed/src/main/scala/akka/typed/cluster/ddata/internal/ReplicatorBehavior.scala index 6071184197..cffe705261 100644 --- a/akka-typed/src/main/scala/akka/typed/cluster/ddata/internal/ReplicatorBehavior.scala +++ b/akka-typed/src/main/scala/akka/typed/cluster/ddata/internal/ReplicatorBehavior.scala @@ -6,6 +6,7 @@ package akka.typed.cluster.ddata.internal import scala.compat.java8.OptionConverters._ import scala.concurrent.duration._ import scala.concurrent.duration.Duration +import scala.concurrent.Future import akka.annotation.InternalApi import akka.cluster.{ ddata ⇒ dd } @@ -27,14 +28,12 @@ import akka.typed.Terminated import akka.typed.cluster.ddata.scaladsl.{ Replicator ⇒ SReplicator } private case class InternalChanged[A <: ReplicatedData](chg: dd.Replicator.Changed[A], subscriber: ActorRef[JReplicator.Changed[A]]) - extends JReplicator.Command[A] { - override def key: Key[A] = chg.key - } + extends JReplicator.Command val localAskTimeout = 60.seconds // ReadLocal, WriteLocal shouldn't timeout val additionalAskTimeout = 1.second - def behavior(settings: dd.ReplicatorSettings): Behavior[SReplicator.Command[_]] = { + def behavior(settings: dd.ReplicatorSettings): Behavior[SReplicator.Command] = { val untypedReplicatorProps = dd.Replicator.props(settings) Actor.deferred { ctx ⇒ @@ -42,9 +41,9 @@ import akka.typed.Terminated val untypedReplicator = ctx.actorOf(untypedReplicatorProps, name = "underlying") def withState( - subscribeAdapters: Map[ActorRef[JReplicator.Changed[ReplicatedData]], ActorRef[dd.Replicator.Changed[ReplicatedData]]]): Behavior[SReplicator.Command[_]] = { + subscribeAdapters: Map[ActorRef[JReplicator.Changed[ReplicatedData]], ActorRef[dd.Replicator.Changed[ReplicatedData]]]): Behavior[SReplicator.Command] = { - def stopSubscribeAdapter(subscriber: ActorRef[JReplicator.Changed[ReplicatedData]]): Behavior[SReplicator.Command[_]] = { + def stopSubscribeAdapter(subscriber: ActorRef[JReplicator.Changed[ReplicatedData]]): Behavior[SReplicator.Command] = { subscribeAdapters.get(subscriber) match { case Some(adapter) ⇒ // will be unsubscribed from untypedReplicator via Terminated @@ -55,7 +54,7 @@ import akka.typed.Terminated } } - Actor.immutable[SReplicator.Command[_]] { (ctx, msg) ⇒ + Actor.immutable[SReplicator.Command] { (ctx, msg) ⇒ msg match { case cmd: SReplicator.Get[_] ⇒ untypedReplicator.tell( @@ -63,12 +62,6 @@ import akka.typed.Terminated sender = cmd.replyTo.toUntyped) Actor.same - case cmd: SReplicator.Update[_] ⇒ - untypedReplicator.tell( - dd.Replicator.Update(cmd.key, cmd.writeConsistency, cmd.request)(cmd.modify), - sender = cmd.replyTo.toUntyped) - Actor.same - case cmd: JReplicator.Get[d] ⇒ implicit val timeout = Timeout(cmd.consistency.timeout match { case Duration.Zero ⇒ localAskTimeout @@ -87,6 +80,12 @@ import akka.typed.Terminated reply.foreach { cmd.replyTo ! _ } Actor.same + case cmd: SReplicator.Update[_] ⇒ + untypedReplicator.tell( + dd.Replicator.Update(cmd.key, cmd.writeConsistency, cmd.request)(cmd.modify), + sender = cmd.replyTo.toUntyped) + Actor.same + case cmd: JReplicator.Update[d] ⇒ implicit val timeout = Timeout(cmd.writeConsistency.timeout match { case Duration.Zero ⇒ localAskTimeout @@ -136,6 +135,48 @@ import akka.typed.Terminated case cmd: JReplicator.Unsubscribe[ReplicatedData] @unchecked ⇒ stopSubscribeAdapter(cmd.subscriber) + case cmd: SReplicator.Delete[_] ⇒ + untypedReplicator.tell( + dd.Replicator.Delete(cmd.key, cmd.consistency, cmd.request), + sender = cmd.replyTo.toUntyped) + Actor.same + + case cmd: JReplicator.Delete[d] ⇒ + implicit val timeout = Timeout(cmd.consistency.timeout match { + case Duration.Zero ⇒ localAskTimeout + case t ⇒ t + additionalAskTimeout + }) + import ctx.executionContext + val reply = + (untypedReplicator ? dd.Replicator.Delete(cmd.key, cmd.consistency.toUntyped, cmd.request.asScala)) + .mapTo[dd.Replicator.DeleteResponse[d]].map { + case rsp: dd.Replicator.DeleteSuccess[d] ⇒ JReplicator.DeleteSuccess(rsp.key, rsp.request.asJava) + case rsp: dd.Replicator.ReplicationDeleteFailure[d] ⇒ JReplicator.ReplicationDeleteFailure(rsp.key, rsp.request.asJava) + case rsp: dd.Replicator.DataDeleted[d] ⇒ JReplicator.DataDeleted(rsp.key, rsp.request.asJava) + case rsp: dd.Replicator.StoreFailure[d] ⇒ JReplicator.StoreFailure(rsp.key, rsp.request.asJava) + }.recover { + case _ ⇒ JReplicator.ReplicationDeleteFailure(cmd.key, cmd.request) + } + reply.foreach { cmd.replyTo ! _ } + Actor.same + + case SReplicator.GetReplicaCount(replyTo) ⇒ + untypedReplicator.tell(dd.Replicator.GetReplicaCount, sender = replyTo.toUntyped) + Actor.same + + case JReplicator.GetReplicaCount(replyTo) ⇒ + implicit val timeout = Timeout(localAskTimeout) + import ctx.executionContext + val reply = + (untypedReplicator ? dd.Replicator.GetReplicaCount) + .mapTo[dd.Replicator.ReplicaCount].map(rsp ⇒ JReplicator.ReplicaCount(rsp.n)) + reply.foreach { replyTo ! _ } + Actor.same + + case SReplicator.FlushChanges | JReplicator.FlushChanges ⇒ + untypedReplicator.tell(dd.Replicator.FlushChanges, sender = akka.actor.ActorRef.noSender) + Actor.same + } } .onSignal { diff --git a/akka-typed/src/main/scala/akka/typed/cluster/ddata/javadsl/Replicator.scala b/akka-typed/src/main/scala/akka/typed/cluster/ddata/javadsl/Replicator.scala index 240aad9538..c354e9a443 100644 --- a/akka-typed/src/main/scala/akka/typed/cluster/ddata/javadsl/Replicator.scala +++ b/akka-typed/src/main/scala/akka/typed/cluster/ddata/javadsl/Replicator.scala @@ -20,16 +20,18 @@ import java.util.Optional import akka.actor.DeadLetterSuppression import akka.annotation.InternalApi import akka.annotation.DoNotInherit +import scala.util.control.NoStackTrace +/** + * @see [[akka.cluster.ddata.Replicator]]. + */ object Replicator { import dd.Replicator.DefaultMajorityMinCap - def behavior(settings: dd.ReplicatorSettings): Behavior[Command[_]] = - ReplicatorBehavior.behavior(settings).narrow[Command[_]] + def behavior(settings: dd.ReplicatorSettings): Behavior[Command] = + ReplicatorBehavior.behavior(settings).narrow[Command] - @DoNotInherit trait Command[A <: ReplicatedData] extends scaladsl.Replicator.Command[A] { - def key: Key[A] - } + @DoNotInherit trait Command extends scaladsl.Replicator.Command sealed trait ReadConsistency { def timeout: FiniteDuration @@ -108,7 +110,7 @@ object Replicator { * or maintain local correlation data structures. */ final case class Get[A <: ReplicatedData](key: Key[A], consistency: ReadConsistency, replyTo: ActorRef[GetResponse[A]], request: Optional[Any]) - extends Command[A] { + extends Command { def this(key: Key[A], consistency: ReadConsistency, replyTo: ActorRef[GetResponse[A]]) = this(key, consistency, replyTo, Optional.empty[Any]) @@ -127,7 +129,6 @@ object Replicator { /** * The data value, with correct type. - * Scala pattern matching cannot infer the type from the `key` parameter. */ def get[T <: ReplicatedData](key: Key[T]): T = { require(key == this.key, "wrong key used, must use contained key") @@ -170,7 +171,7 @@ object Replicator { */ final case class Update[A <: ReplicatedData] private (key: Key[A], writeConsistency: WriteConsistency, replyTo: ActorRef[UpdateResponse[A]], request: Optional[Any])(val modify: Option[A] ⇒ A) - extends Command[A] with NoSerializationVerificationNeeded { + extends Command with NoSerializationVerificationNeeded { /** * Modify value of local `Replicator` and replicate with given `writeConsistency`. @@ -246,12 +247,6 @@ object Replicator { override def getRequest: Optional[Any] = request } - sealed trait DeleteResponse[A <: ReplicatedData] extends NoSerializationVerificationNeeded { - def key: Key[A] - def request: Optional[Any] - def getRequest: Optional[Any] = request - } - /** * Register a subscriber that will be notified with a [[Changed]] message * when the value of the given `key` is changed. Current value is also @@ -266,13 +261,13 @@ object Replicator { * If the key is deleted the subscriber is notified with a [[Deleted]] * message. */ - final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command[A] + final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command /** * Unregister a subscriber. * * @see [[Replicator.Subscribe]] */ - final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command[A] + final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command /** * The data value is retrieved with [[#get]] using the typed key. * @@ -294,4 +289,54 @@ object Replicator { def dataValue: A = data } + /** + * Send this message to the local `Replicator` to delete a data value for the + * given `key`. The `Replicator` will reply with one of the [[DeleteResponse]] messages. + * + * The optional `request` context is included in the reply messages. This is a convenient + * way to pass contextual information (e.g. original sender) without having to use `ask` + * or maintain local correlation data structures. + */ + final case class Delete[A <: ReplicatedData](key: Key[A], consistency: WriteConsistency, + replyTo: ActorRef[DeleteResponse[A]], request: Optional[Any]) + extends Command with NoSerializationVerificationNeeded { + + def this(key: Key[A], consistency: WriteConsistency, replyTo: ActorRef[DeleteResponse[A]]) = + this(key, consistency, replyTo, Optional.empty()) + } + + sealed trait DeleteResponse[A <: ReplicatedData] extends NoSerializationVerificationNeeded { + def key: Key[A] + def request: Optional[Any] + def getRequest: Optional[Any] = request + } + final case class DeleteSuccess[A <: ReplicatedData](key: Key[A], request: Optional[Any]) extends DeleteResponse[A] + final case class ReplicationDeleteFailure[A <: ReplicatedData](key: Key[A], request: Optional[Any]) extends DeleteResponse[A] + final case class DataDeleted[A <: ReplicatedData](key: Key[A], request: Optional[Any]) + extends RuntimeException with NoStackTrace with DeleteResponse[A] { + override def toString: String = s"DataDeleted [$key]" + } + + /** + * Get current number of replicas, including the local replica. + * Will reply to sender with [[ReplicaCount]]. + */ + final case class GetReplicaCount(replyTo: ActorRef[ReplicaCount]) extends Command + + /** + * Current number of replicas. Reply to `GetReplicaCount`. + */ + final case class ReplicaCount(n: Int) + + /** + * INTERNAL API + */ + @InternalApi private[akka] case object FlushChanges extends Command + + /** + * The `FlushChanges` instance. Notify subscribers of changes now, otherwise they will be notified periodically + * with the configured `notify-subscribers-interval`. + */ + def flushChanges: Command = FlushChanges + } diff --git a/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/Replicator.scala b/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/Replicator.scala index c33b317439..cb090170ad 100644 --- a/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/Replicator.scala +++ b/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/Replicator.scala @@ -12,9 +12,15 @@ import akka.typed.ActorRef import akka.typed.Behavior import akka.typed.cluster.ddata.internal.ReplicatorBehavior +/** + * @see [[akka.cluster.ddata.Replicator]]. + */ object Replicator { - def behavior(settings: ReplicatorSettings): Behavior[Command[_]] = + /** + * The `Behavior` for the `Replicator` actor. + */ + def behavior(settings: ReplicatorSettings): Behavior[Command] = ReplicatorBehavior.behavior(settings) type ReadConsistency = dd.Replicator.ReadConsistency @@ -29,19 +35,32 @@ object Replicator { type WriteMajority = dd.Replicator.WriteMajority type WriteAll = dd.Replicator.WriteAll - trait Command[A <: ReplicatedData] { - def key: Key[A] - } + trait Command - final case class Get[A <: ReplicatedData](key: Key[A], consistency: ReadConsistency, request: Option[Any] = None)(val replyTo: ActorRef[GetResponse[A]]) - extends Command[A] + /** + * Send this message to the local `Replicator` to retrieve a data value for the + * given `key`. The `Replicator` will reply with one of the [[GetResponse]] messages. + * + * The optional `request` context is included in the reply messages. This is a convenient + * way to pass contextual information (e.g. original sender) without having to use `ask` + * or maintain local correlation data structures. + */ + final case class Get[A <: ReplicatedData](key: Key[A], consistency: ReadConsistency, + replyTo: ActorRef[GetResponse[A]], request: Option[Any] = None) extends Command + /** + * Reply from `Get`. The data value is retrieved with [[#get]] using the typed key. + */ type GetResponse[A <: ReplicatedData] = dd.Replicator.GetResponse[A] object GetSuccess { def unapply[A <: ReplicatedData](rsp: GetSuccess[A]): Option[(Key[A], Option[Any])] = Some((rsp.key, rsp.request)) } type GetSuccess[A <: ReplicatedData] = dd.Replicator.GetSuccess[A] type NotFound[A <: ReplicatedData] = dd.Replicator.NotFound[A] + /** + * The [[Get]] request could not be fulfill according to the given + * [[ReadConsistency consistency level]] and [[ReadConsistency#timeout timeout]]. + */ type GetFailure[A <: ReplicatedData] = dd.Replicator.GetFailure[A] object Update { @@ -58,15 +77,16 @@ object Replicator { * or local correlation data structures. */ def apply[A <: ReplicatedData]( - key: Key[A], initial: A, writeConsistency: WriteConsistency, - request: Option[Any] = None)(modify: A ⇒ A)(replyTo: ActorRef[UpdateResponse[A]]): Update[A] = - Update(key, writeConsistency, request)(modifyWithInitial(initial, modify))(replyTo) + key: Key[A], initial: A, writeConsistency: WriteConsistency, replyTo: ActorRef[UpdateResponse[A]], + request: Option[Any] = None)(modify: A ⇒ A): Update[A] = + Update(key, writeConsistency, replyTo, request)(modifyWithInitial(initial, modify)) private def modifyWithInitial[A <: ReplicatedData](initial: A, modify: A ⇒ A): Option[A] ⇒ A = { case Some(data) ⇒ modify(data) case None ⇒ modify(initial) } } + /** * Send this message to the local `Replicator` to update a data value for the * given `key`. The `Replicator` will reply with one of the [[UpdateResponse]] messages. @@ -84,15 +104,39 @@ object Replicator { * for example not access `sender()` reference of an enclosing actor. */ final case class Update[A <: ReplicatedData](key: Key[A], writeConsistency: WriteConsistency, - request: Option[Any])(val modify: Option[A] ⇒ A)(val replyTo: ActorRef[UpdateResponse[A]]) - extends Command[A] with NoSerializationVerificationNeeded { + replyTo: ActorRef[UpdateResponse[A]], + request: Option[Any])(val modify: Option[A] ⇒ A) + extends Command with NoSerializationVerificationNeeded { } type UpdateResponse[A <: ReplicatedData] = dd.Replicator.UpdateResponse[A] type UpdateSuccess[A <: ReplicatedData] = dd.Replicator.UpdateSuccess[A] type UpdateFailure[A <: ReplicatedData] = dd.Replicator.UpdateFailure[A] + /** + * The direct replication of the [[Update]] could not be fulfill according to + * the given [[WriteConsistency consistency level]] and + * [[WriteConsistency#timeout timeout]]. + * + * The `Update` was still performed locally and possibly replicated to some nodes. + * It will eventually be disseminated to other replicas, unless the local replica + * crashes before it has been able to communicate with other replicas. + */ type UpdateTimeout[A <: ReplicatedData] = dd.Replicator.UpdateTimeout[A] + /** + * If the `modify` function of the [[Update]] throws an exception the reply message + * will be this `ModifyFailure` message. The original exception is included as `cause`. + */ type ModifyFailure[A <: ReplicatedData] = dd.Replicator.ModifyFailure[A] + /** + * The local store or direct replication of the [[Update]] could not be fulfill according to + * the given [[WriteConsistency consistency level]] due to durable store errors. This is + * only used for entries that have been configured to be durable. + * + * The `Update` was still performed in memory locally and possibly replicated to some nodes, + * but it might not have been written to durable storage. + * It will eventually be disseminated to other replicas, unless the local replica + * crashes before it has been able to communicate with other replicas. + */ type StoreFailure[A <: ReplicatedData] = dd.Replicator.StoreFailure[A] /** @@ -110,7 +154,7 @@ object Replicator { * message. */ final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) - extends Command[A] + extends Command /** * Unregister a subscriber. @@ -118,11 +162,50 @@ object Replicator { * @see [[Replicator.Subscribe]] */ final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) - extends Command[A] + extends Command object Changed { def unapply[A <: ReplicatedData](chg: Changed[A]): Option[Key[A]] = Some(chg.key) } + /** + * The data value is retrieved with [[#get]] using the typed key. + * + * @see [[Replicator.Subscribe]] + */ type Changed[A <: ReplicatedData] = dd.Replicator.Changed[A] + /** + * Send this message to the local `Replicator` to delete a data value for the + * given `key`. The `Replicator` will reply with one of the [[DeleteResponse]] messages. + * + * The optional `request` context is included in the reply messages. This is a convenient + * way to pass contextual information (e.g. original sender) without having to use `ask` + * or maintain local correlation data structures. + */ + final case class Delete[A <: ReplicatedData](key: Key[A], consistency: WriteConsistency, + replyTo: ActorRef[DeleteResponse[A]], request: Option[Any]) + extends Command with NoSerializationVerificationNeeded + + type DeleteResponse[A <: ReplicatedData] = dd.Replicator.DeleteResponse[A] + type DeleteSuccess[A <: ReplicatedData] = dd.Replicator.DeleteSuccess[A] + type ReplicationDeleteFailure[A <: ReplicatedData] = dd.Replicator.ReplicationDeleteFailure[A] + type DataDeleted[A <: ReplicatedData] = dd.Replicator.DataDeleted[A] + + /** + * Get current number of replicas, including the local replica. + * Will reply to sender with [[ReplicaCount]]. + */ + final case class GetReplicaCount(replyTo: ActorRef[ReplicaCount]) extends Command + + /** + * Current number of replicas. Reply to `GetReplicaCount`. + */ + type ReplicaCount = dd.Replicator.ReplicaCount + + /** + * Notify subscribers of changes now, otherwise they will be notified periodically + * with the configured `notify-subscribers-interval`. + */ + object FlushChanges extends Command + } diff --git a/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/ReplicatorSettings.scala b/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/ReplicatorSettings.scala index d8f0317702..9b3e4a3455 100644 --- a/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/ReplicatorSettings.scala +++ b/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/ReplicatorSettings.scala @@ -8,6 +8,9 @@ import akka.typed.ActorSystem import akka.typed.scaladsl.adapter._ import com.typesafe.config.Config +/** + * @see [[akka.cluster.ddata.ReplicatorSettings]]. + */ object ReplicatorSettings { /** * Create settings from the default configuration diff --git a/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/package.scala b/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/package.scala index e144131038..d82d40127e 100644 --- a/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/package.scala +++ b/akka-typed/src/main/scala/akka/typed/cluster/ddata/scaladsl/package.scala @@ -6,5 +6,8 @@ package akka.typed.cluster.ddata import akka.cluster.{ ddata ⇒ dd } package object scaladsl { + /** + * @see [[akka.cluster.ddata.ReplicatorSettings]]. + */ type ReplicatorSettings = dd.ReplicatorSettings }