From 1d09d2725bff64216763fad5b94054bd112afbf9 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 16 Aug 2019 07:33:41 +0200 Subject: [PATCH] API changes related to DataDeleted, #27371 * The reason for this change is that `DataDeleted` didn't extend the `UpdateResponse` and `GetResponse` types and could therefore cause problems when `Update` and `Get` were used with `ask`. This was also a problem for Akka Typed. * Introduce new messages types UpdateDataDeleted and GetDataDeleted * Introduce SubscribeResponse because the responses can be both `Changed` and `Deleted` are responses to subscriptions. Important for Typed. --- .../typed/internal/ReplicatorBehavior.scala | 30 ++++++----- .../ddata/typed/javadsl/Replicator.scala | 41 +++++++++----- .../javadsl/ReplicatorMessageAdapter.scala | 10 ++-- .../ddata/typed/scaladsl/Replicator.scala | 43 +++++++++++++-- .../scaladsl/ReplicatorMessageAdapter.scala | 9 ++-- .../ddata/typed/javadsl/ReplicatorTest.java | 33 +++++++----- .../ddata/typed/scaladsl/ReplicatorSpec.scala | 29 ++++++---- .../scala/akka/cluster/ddata/Replicator.scala | 54 +++++++++++++------ .../cluster/ddata/ReplicatorChaosSpec.scala | 2 +- .../akka/cluster/ddata/ReplicatorSpec.scala | 24 +++++++-- .../project/migration-guide-2.5.x-2.6.x.md | 50 +++++++++++------ 11 files changed, 228 insertions(+), 97 deletions(-) diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/internal/ReplicatorBehavior.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/internal/ReplicatorBehavior.scala index 9c39f09a45..3159d9cab6 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/internal/ReplicatorBehavior.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/internal/ReplicatorBehavior.scala @@ -25,9 +25,9 @@ import akka.actor.typed.Terminated import akka.cluster.ddata.typed.javadsl.{ Replicator => JReplicator } import akka.cluster.ddata.typed.scaladsl.{ Replicator => SReplicator } - private case class InternalChanged[A <: ReplicatedData]( - chg: dd.Replicator.Changed[A], - subscriber: ActorRef[JReplicator.Changed[A]]) + private case class InternalSubscribeResponse[A <: ReplicatedData]( + chg: dd.Replicator.SubscribeResponse[A], + subscriber: ActorRef[JReplicator.SubscribeResponse[A]]) extends JReplicator.Command val localAskTimeout = 60.seconds // ReadLocal, WriteLocal shouldn't timeout @@ -82,8 +82,9 @@ import akka.actor.typed.Terminated .map { case rsp: dd.Replicator.GetSuccess[d] => JReplicator.GetSuccess(rsp.key)(rsp.dataValue) - case rsp: dd.Replicator.NotFound[d] => JReplicator.NotFound(rsp.key) - case rsp: dd.Replicator.GetFailure[d] => JReplicator.GetFailure(rsp.key) + case rsp: dd.Replicator.NotFound[d] => JReplicator.NotFound(rsp.key) + case rsp: dd.Replicator.GetFailure[d] => JReplicator.GetFailure(rsp.key) + case rsp: dd.Replicator.GetDataDeleted[d] => JReplicator.GetDataDeleted(rsp.key) } .recover { case _ => JReplicator.GetFailure(cmd.key) @@ -111,7 +112,8 @@ import akka.actor.typed.Terminated case rsp: dd.Replicator.UpdateTimeout[d] => JReplicator.UpdateTimeout(rsp.key) case rsp: dd.Replicator.ModifyFailure[d] => JReplicator.ModifyFailure(rsp.key, rsp.errorMessage, rsp.cause) - case rsp: dd.Replicator.StoreFailure[d] => JReplicator.StoreFailure(rsp.key) + case rsp: dd.Replicator.StoreFailure[d] => JReplicator.StoreFailure(rsp.key) + case rsp: dd.Replicator.UpdateDataDeleted[d] => JReplicator.UpdateDataDeleted(rsp.key) } .recover { case _ => JReplicator.UpdateTimeout(cmd.key) @@ -130,8 +132,9 @@ import akka.actor.typed.Terminated // For the Java API the Changed messages must be mapped to the JReplicator.Changed class. // That is done with an adapter, and we have to keep track of the lifecycle of the original // subscriber and stop the adapter when the original subscriber is stopped. - val adapter: ActorRef[dd.Replicator.Changed[ReplicatedData]] = ctx.spawnMessageAdapter { chg => - InternalChanged(chg, cmd.subscriber) + val adapter: ActorRef[dd.Replicator.SubscribeResponse[ReplicatedData]] = ctx.spawnMessageAdapter { + rsp => + InternalSubscribeResponse(rsp, cmd.subscriber) } classicReplicator.tell( @@ -142,8 +145,11 @@ import akka.actor.typed.Terminated withState(subscribeAdapters.updated(cmd.subscriber, adapter)) - case InternalChanged(chg, subscriber) => - subscriber ! JReplicator.Changed(chg.key)(chg.dataValue) + case InternalSubscribeResponse(rsp, subscriber) => + rsp match { + case chg: dd.Replicator.Changed[_] => subscriber ! JReplicator.Changed(chg.key)(chg.dataValue) + case del: dd.Replicator.Deleted[_] => subscriber ! JReplicator.Deleted(del.key) + } Behaviors.same case cmd: JReplicator.Unsubscribe[ReplicatedData] @unchecked => @@ -165,12 +171,12 @@ import akka.actor.typed.Terminated .map { case rsp: dd.Replicator.DeleteSuccess[d] => JReplicator.DeleteSuccess(rsp.key) case rsp: dd.Replicator.ReplicationDeleteFailure[d] => - JReplicator.ReplicationDeleteFailure(rsp.key) + JReplicator.DeleteFailure(rsp.key) case rsp: dd.Replicator.DataDeleted[d] => JReplicator.DataDeleted(rsp.key) case rsp: dd.Replicator.StoreFailure[d] => JReplicator.StoreFailure(rsp.key) } .recover { - case _ => JReplicator.ReplicationDeleteFailure(cmd.key) + case _ => JReplicator.DeleteFailure(cmd.key) } reply.foreach { cmd.replyTo ! _ } Behaviors.same diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/Replicator.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/Replicator.scala index 70b7224d99..1f2c5a1e96 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/Replicator.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/Replicator.scala @@ -7,8 +7,6 @@ package akka.cluster.ddata.typed.javadsl import java.time.Duration import java.util.function.{ Function => JFunction } -import scala.util.control.NoStackTrace - import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.actor.DeadLetterSuppression @@ -152,6 +150,11 @@ object Replicator { */ final case class GetFailure[A <: ReplicatedData](key: Key[A]) extends GetResponse[A] + /** + * The [[Get]] request couldn't be performed because the entry has been deleted. + */ + final case class GetDataDeleted[A <: ReplicatedData](key: Key[A]) extends GetResponse[A] + object Update { private def modifyWithInitial[A <: ReplicatedData](initial: A, modify: A => A): Option[A] => A = { @@ -215,6 +218,11 @@ object Replicator { */ final case class UpdateTimeout[A <: ReplicatedData](key: Key[A]) extends UpdateFailure[A] + /** + * The [[Update]] couldn't be performed because the entry has been deleted. + */ + final case class UpdateDataDeleted[A <: ReplicatedData](key: Key[A]) extends UpdateResponse[A] + /** * If the `modify` function of the [[Update]] throws an exception the reply message * will be this `ModifyFailure` message. The original exception is included as `cause`. @@ -250,21 +258,30 @@ object Replicator { * If the key is deleted the subscriber is notified with a [[Deleted]] * message. */ - final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command + final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[SubscribeResponse[A]]) + extends Command /** * Unregister a subscriber. * * @see [[Replicator.Subscribe]] */ - final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command + final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[SubscribeResponse[A]]) + extends Command + + /** + * @see [[Replicator.Subscribe]] + */ + sealed trait SubscribeResponse[A <: ReplicatedData] extends NoSerializationVerificationNeeded { + def key: Key[A] + } /** * The data value is retrieved with [[#get]] using the typed key. * * @see [[Replicator.Subscribe]] */ - final case class Changed[A <: ReplicatedData](key: Key[A])(data: A) { + final case class Changed[A <: ReplicatedData](key: Key[A])(data: A) extends SubscribeResponse[A] { /** * The data value, with correct type. @@ -281,6 +298,11 @@ object Replicator { def dataValue: A = data } + /** + * @see [[Replicator.Subscribe]] + */ + final case class Deleted[A <: ReplicatedData](key: Key[A]) extends SubscribeResponse[A] + /** * Send this message to the local `Replicator` to delete a data value for the * given `key`. The `Replicator` will reply with one of the [[DeleteResponse]] messages. @@ -296,13 +318,8 @@ object Replicator { def key: Key[A] } final case class DeleteSuccess[A <: ReplicatedData](key: Key[A]) extends DeleteResponse[A] - final case class ReplicationDeleteFailure[A <: ReplicatedData](key: Key[A]) extends DeleteResponse[A] - final case class DataDeleted[A <: ReplicatedData](key: Key[A]) - extends RuntimeException - with NoStackTrace - with DeleteResponse[A] { - override def toString: String = s"DataDeleted [$key]" - } + final case class DeleteFailure[A <: ReplicatedData](key: Key[A]) extends DeleteResponse[A] + final case class DataDeleted[A <: ReplicatedData](key: Key[A]) extends DeleteResponse[A] /** * Get current number of replicas, including the local replica. diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/ReplicatorMessageAdapter.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/ReplicatorMessageAdapter.scala index aad27b8be3..b265d1f5ee 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/ReplicatorMessageAdapter.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/ReplicatorMessageAdapter.scala @@ -57,21 +57,21 @@ class ReplicatorMessageAdapter[A, B <: ReplicatedData]( private implicit val askTimeout: Timeout = Timeout(unexpectedAskTimeout.asScala) - private var changedMessageAdapters: Map[Key[B], ActorRef[Replicator.Changed[B]]] = Map.empty + private var changedMessageAdapters: Map[Key[B], ActorRef[Replicator.SubscribeResponse[B]]] = Map.empty /** - * Subscribe to changes of the given `key`. The [[Replicator.Changed]] messages from + * Subscribe to changes of the given `key`. The [[Replicator.Changed]] and [[Replicator.Deleted]] messages from * the replicator are transformed to the message protocol of the requesting actor with * the given `responseAdapter` function. */ - def subscribe(key: Key[B], responseAdapter: JFunction[Replicator.Changed[B], A]): Unit = { + def subscribe(key: Key[B], responseAdapter: JFunction[Replicator.SubscribeResponse[B], A]): Unit = { // unsubscribe in case it's called more than once per key unsubscribe(key) changedMessageAdapters.get(key).foreach { subscriber => replicator ! Replicator.Unsubscribe(key, subscriber) } - val replyTo: ActorRef[Replicator.Changed[B]] = - context.messageAdapter(classOf[Replicator.Changed[B]], responseAdapter) + val replyTo: ActorRef[Replicator.SubscribeResponse[B]] = + context.messageAdapter(classOf[Replicator.SubscribeResponse[B]], responseAdapter) changedMessageAdapters = changedMessageAdapters.updated(key, replyTo) replicator ! Replicator.Subscribe(key, replyTo) } diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/Replicator.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/Replicator.scala index 1aa4122bd1..d22593d079 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/Replicator.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/Replicator.scala @@ -86,6 +86,15 @@ object Replicator { def unapply[A <: ReplicatedData](rsp: GetFailure[A]): Option[Key[A]] = Some(rsp.key) } + /** + * The [[Get]] request couldn't be performed because the entry has been deleted. + */ + type GetDataDeleted[A <: ReplicatedData] = dd.Replicator.GetDataDeleted[A] + object GetDataDeleted { + def unapply[A <: ReplicatedData](rsp: GetDataDeleted[A]): Option[Key[A]] = + Some(rsp.key) + } + object Update { /** @@ -165,6 +174,15 @@ object Replicator { Some(rsp.key) } + /** + * The [[Update]] couldn't be performed because the entry has been deleted. + */ + type UpdateDataDeleted[A <: ReplicatedData] = dd.Replicator.UpdateDataDeleted[A] + object UpdateDataDeleted { + def unapply[A <: ReplicatedData](rsp: UpdateDataDeleted[A]): Option[Key[A]] = + Some(rsp.key) + } + /** * If the `modify` function of the [[Update]] throws an exception the reply message * will be this `ModifyFailure` message. The original exception is included as `cause`. @@ -214,6 +232,16 @@ object Replicator { */ final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command + /** + * @see [[Replicator.Subscribe]] + */ + type SubscribeResponse[A <: ReplicatedData] = dd.Replicator.SubscribeResponse[A] + + /** + * The data value is retrieved with [[#get]] using the typed key. + * + * @see [[Replicator.Subscribe]] + */ object Changed { def unapply[A <: ReplicatedData](chg: Changed[A]): Option[Key[A]] = Some(chg.key) } @@ -225,6 +253,15 @@ object Replicator { */ type Changed[A <: ReplicatedData] = dd.Replicator.Changed[A] + object Deleted { + def unapply[A <: ReplicatedData](del: Deleted[A]): Option[Key[A]] = Some(del.key) + } + + /** + * @see [[Replicator.Subscribe]] + */ + type Deleted[A <: ReplicatedData] = dd.Replicator.Deleted[A] + object Delete { /** @@ -253,9 +290,9 @@ object Replicator { def unapply[A <: ReplicatedData](rsp: DeleteSuccess[A]): Option[Key[A]] = Some(rsp.key) } - type ReplicationDeleteFailure[A <: ReplicatedData] = dd.Replicator.ReplicationDeleteFailure[A] - object ReplicationDeleteFailure { - def unapply[A <: ReplicatedData](rsp: ReplicationDeleteFailure[A]): Option[Key[A]] = + type DeleteFailure[A <: ReplicatedData] = dd.Replicator.ReplicationDeleteFailure[A] + object DeleteFailure { + def unapply[A <: ReplicatedData](rsp: DeleteFailure[A]): Option[Key[A]] = Some(rsp.key) } type DataDeleted[A <: ReplicatedData] = dd.Replicator.DataDeleted[A] diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorMessageAdapter.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorMessageAdapter.scala index d6061dbb5c..3ec4c07e0b 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorMessageAdapter.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorMessageAdapter.scala @@ -61,20 +61,21 @@ class ReplicatorMessageAdapter[A, B <: ReplicatedData]( private implicit val askTimeout: Timeout = Timeout(unexpectedAskTimeout) - private var changedMessageAdapters: Map[Key[B], ActorRef[Replicator.Changed[B]]] = Map.empty + private var changedMessageAdapters: Map[Key[B], ActorRef[Replicator.SubscribeResponse[B]]] = Map.empty /** - * Subscribe to changes of the given `key`. The [[Replicator.Changed]] messages from + * Subscribe to changes of the given `key`. The [[Replicator.Changed]] and [[Replicator.Deleted]] messages from * the replicator are transformed to the message protocol of the requesting actor with * the given `responseAdapter` function. */ - def subscribe(key: Key[B], responseAdapter: Replicator.Changed[B] => A): Unit = { + def subscribe(key: Key[B], responseAdapter: Replicator.SubscribeResponse[B] => A): Unit = { // unsubscribe in case it's called more than once per key unsubscribe(key) changedMessageAdapters.get(key).foreach { subscriber => replicator ! Replicator.Unsubscribe(key, subscriber) } - val replyTo: ActorRef[Replicator.Changed[B]] = context.messageAdapter[Replicator.Changed[B]](responseAdapter) + val replyTo: ActorRef[Replicator.SubscribeResponse[B]] = + context.messageAdapter[Replicator.SubscribeResponse[B]](responseAdapter) changedMessageAdapters = changedMessageAdapters.updated(key, replyTo) replicator ! Replicator.Subscribe(key, replyTo) } diff --git a/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java b/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java index 344ddcd62c..b506a9ff78 100644 --- a/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java +++ b/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java @@ -74,11 +74,11 @@ public class ReplicatorTest extends JUnitSuite { } } - private static final class InternalChanged implements InternalMsg { - final Replicator.Changed chg; + private static final class InternalSubscribeResponse implements InternalMsg { + final Replicator.SubscribeResponse rsp; - InternalChanged(Replicator.Changed chg) { - this.chg = chg; + InternalSubscribeResponse(Replicator.SubscribeResponse rsp) { + this.rsp = rsp; } } @@ -102,7 +102,7 @@ public class ReplicatorTest extends JUnitSuite { node = DistributedData.get(ctx.getSystem()).selfUniqueAddress(); - this.replicatorAdapter.subscribe(this.key, InternalChanged::new); + this.replicatorAdapter.subscribe(this.key, InternalSubscribeResponse::new); } public static Behavior create(Key key) { @@ -121,7 +121,7 @@ public class ReplicatorTest extends JUnitSuite { .onMessage(GetValue.class, this::onGetValue) .onMessage(GetCachedValue.class, this::onGetCachedValue) .onMessage(InternalGetResponse.class, this::onInternalGetResponse) - .onMessage(InternalChanged.class, this::onInternalChanged) + .onMessage(InternalSubscribeResponse.class, this::onInternalSubscribeResponse) .build(); } @@ -136,7 +136,7 @@ public class ReplicatorTest extends JUnitSuite { curr -> curr.increment(node, 1)), InternalUpdateResponse::new); - return Behaviors.same(); + return this; } private Behavior onGetValue(GetValue cmd) { @@ -144,29 +144,34 @@ public class ReplicatorTest extends JUnitSuite { askReplyTo -> new Replicator.Get<>(key, Replicator.readLocal(), askReplyTo), rsp -> new InternalGetResponse(rsp, cmd.replyTo)); - return Behaviors.same(); + return this; } private Behavior onGetCachedValue(GetCachedValue cmd) { cmd.replyTo.tell(cachedValue); - return Behaviors.same(); + return this; } private Behavior onInternalGetResponse(InternalGetResponse msg) { if (msg.rsp instanceof Replicator.GetSuccess) { int value = ((Replicator.GetSuccess) msg.rsp).get(key).getValue().intValue(); msg.replyTo.tell(value); - return Behaviors.same(); + return this; } else { // not dealing with failures return Behaviors.unhandled(); } } - private Behavior onInternalChanged(InternalChanged msg) { - GCounter counter = msg.chg.get(key); - cachedValue = counter.getValue().intValue(); - return this; + private Behavior onInternalSubscribeResponse(InternalSubscribeResponse msg) { + if (msg.rsp instanceof Replicator.Changed) { + GCounter counter = ((Replicator.Changed) msg.rsp).get(key); + cachedValue = counter.getValue().intValue(); + return this; + } else { + // no deletes + return Behaviors.unhandled(); + } } } diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala index c0ac375302..09db868db1 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala @@ -42,7 +42,7 @@ object ReplicatorSpec { private case class InternalUpdateResponse(rsp: Replicator.UpdateResponse[GCounter]) extends InternalMsg private case class InternalGetResponse(rsp: Replicator.GetResponse[GCounter], replyTo: ActorRef[Int]) extends InternalMsg - private case class InternalChanged(chg: Replicator.Changed[GCounter]) extends InternalMsg + private case class InternalSubscribeResponse(chg: Replicator.SubscribeResponse[GCounter]) extends InternalMsg def client(key: GCounterKey): Behavior[ClientCommand] = Behaviors.setup[ClientCommand] { ctx => @@ -50,7 +50,7 @@ object ReplicatorSpec { // adapter that turns the response messages from the replicator into our own protocol DistributedData.withReplicatorMessageAdapter[ClientCommand, GCounter] { replicatorAdapter => - replicatorAdapter.subscribe(key, InternalChanged.apply) + replicatorAdapter.subscribe(key, InternalSubscribeResponse.apply) def behavior(cachedValue: Int): Behavior[ClientCommand] = { Behaviors.receiveMessage[ClientCommand] { @@ -84,9 +84,12 @@ object ReplicatorSpec { case InternalGetResponse(_, _) => Behaviors.unhandled // not dealing with failures - case InternalChanged(chg @ Replicator.Changed(`key`)) => + case InternalSubscribeResponse(chg @ Replicator.Changed(`key`)) => val value = chg.get(key).value.intValue behavior(value) + + case InternalSubscribeResponse(Replicator.Deleted(_)) => + Behaviors.unhandled // no deletes } } } @@ -155,9 +158,10 @@ object ReplicatorSpec { val key = GCounterKey("counter") getResponse match { - case GetSuccess(`key`) => - case GetFailure(`key`) => - case NotFound(`key`) => + case GetSuccess(`key`) => + case GetFailure(`key`) => + case NotFound(`key`) => + case GetDataDeleted(`key`) => } val updateResponse: UpdateResponse[GCounter] = ??? @@ -167,13 +171,20 @@ object ReplicatorSpec { case UpdateTimeout(`key`) => case StoreFailure(`key`) => case UpdateFailure(`key`) => + case UpdateDataDeleted(`key`) => } val deleteResponse: DeleteResponse[GCounter] = ??? deleteResponse match { - case DeleteSuccess(`key`) => - case ReplicationDeleteFailure(`key`) => - case DataDeleted(`key`) => + case DeleteSuccess(`key`) => + case DeleteFailure(`key`) => + case DataDeleted(`key`) => + } + + val subscribeResponse: SubscribeResponse[GCounter] = ??? + subscribeResponse match { + case Changed(`key`) => + case Deleted(`key`) => } val replicaCount: ReplicaCount = ??? diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index 36810342ee..0eeee7aacf 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -540,6 +540,11 @@ object Replicator { extends GetResponse[A] with ReplicatorMessage + /** + * The [[Get]] request couldn't be performed because the entry has been deleted. + */ + final case class GetDataDeleted[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends GetResponse[A] + /** * Register a subscriber that will be notified with a [[Changed]] message * when the value of the given `key` is changed. Current value is also @@ -563,12 +568,21 @@ object Replicator { */ final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef) extends ReplicatorMessage + /** + * @see [[Replicator.Subscribe]] + */ + sealed trait SubscribeResponse[A <: ReplicatedData] extends NoSerializationVerificationNeeded { + def key: Key[A] + } + /** * The data value is retrieved with [[#get]] using the typed key. * * @see [[Replicator.Subscribe]] */ - final case class Changed[A <: ReplicatedData](key: Key[A])(data: A) extends ReplicatorMessage { + final case class Changed[A <: ReplicatedData](key: Key[A])(data: A) + extends SubscribeResponse[A] + with ReplicatorMessage { /** * The data value, with correct type. @@ -585,9 +599,10 @@ object Replicator { def dataValue: A = data } - final case class Deleted[A <: ReplicatedData](key: Key[A]) extends NoSerializationVerificationNeeded { - override def toString: String = s"Deleted [$key]" - } + /** + * @see [[Replicator.Subscribe]] + */ + final case class Deleted[A <: ReplicatedData](key: Key[A]) extends SubscribeResponse[A] object Update { @@ -691,6 +706,11 @@ object Replicator { */ final case class UpdateTimeout[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends UpdateFailure[A] + /** + * The [[Update]] couldn't be performed because the entry has been deleted. + */ + final case class UpdateDataDeleted[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends UpdateResponse[A] + /** * If the `modify` function of the [[Update]] throws an exception the reply message * will be this `ModifyFailure` message. The original exception is included as `cause`. @@ -1172,7 +1192,8 @@ object Replicator { * * A deleted key cannot be reused again, but it is still recommended to delete unused * data entries because that reduces the replication overhead when new nodes join the cluster. - * Subsequent `Delete`, `Update` and `Get` requests will be replied with [[Replicator.DataDeleted]]. + * Subsequent `Delete`, `Update` and `Get` requests will be replied with [[Replicator.DataDeleted]], + * [[Replicator.UpdateDataDeleted]] and [[Replicator.GetDataDeleted]] respectively. * Subscribers will receive [[Replicator.Deleted]]. * * In the `Delete` message you can pass an optional request context in the same way as for the @@ -1515,10 +1536,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog def receiveGet(key: KeyR, consistency: ReadConsistency, req: Option[Any]): Unit = { val localValue = getData(key.id) - log.debug("Received Get for key [{}]", key) + log.debug("Received Get for key [{}].", key) if (isLocalGet(consistency)) { val reply = localValue match { - case Some(DataEnvelope(DeletedData, _, _)) => DataDeleted(key, req) + case Some(DataEnvelope(DeletedData, _, _)) => GetDataDeleted(key, req) case Some(DataEnvelope(data, _, _)) => GetSuccess(key, req)(data) case None => NotFound(key, req) } @@ -1559,7 +1580,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog Try { localValue match { - case Some(DataEnvelope(DeletedData, _, _)) => throw new DataDeleted(key, req) + case Some(envelope @ DataEnvelope(DeletedData, _, _)) => + (envelope, None) case Some(envelope @ DataEnvelope(existing, _, _)) => modify(Some(existing)) match { case d: DeltaReplicatedData if deltaCrdtEnabled => @@ -1575,8 +1597,11 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } } } match { + case Success((DataEnvelope(DeletedData, _, _), _)) => + log.debug("Received Update for deleted key [{}].", key) + replyTo ! UpdateDataDeleted(key, req) case Success((envelope, delta)) => - log.debug("Received Update for key [{}]", key) + log.debug("Received Update for key [{}].", key) // handle the delta delta match { @@ -1628,9 +1653,6 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), writeAggregator))) } } - case Failure(e: DataDeleted[_]) => - log.debug("Received Update for deleted key [{}]", key) - replyTo ! e case Failure(e) => log.debug("Received Update for key [{}], failed: {}", key, e.getMessage) replyTo ! ModifyFailure(key, "Update failed: " + e.getMessage, e, req) @@ -1859,7 +1881,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog val isDebugEnabled = log.isDebugEnabled if (isDebugEnabled) log.debug( - "Received DeltaPropagation from [{}], containing [{}]", + "Received DeltaPropagation from [{}], containing [{}].", fromNode.address, deltas.collect { case (key, Delta(_, fromSeqNr, toSeqNr)) => s"$key $fromSeqNr-$toSeqNr" }.mkString(", ")) @@ -1960,7 +1982,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog def receiveStatus(otherDigests: Map[KeyId, Digest], chunk: Int, totChunks: Int, fromSystemUid: Option[Long]): Unit = { if (log.isDebugEnabled) log.debug( - "Received gossip status from [{}], chunk [{}] of [{}] containing [{}]", + "Received gossip status from [{}], chunk [{}] of [{}] containing [{}].", replyTo.path.address, (chunk + 1), totChunks, @@ -2008,7 +2030,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog def receiveGossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean, fromSystemUid: Option[Long]): Unit = { if (log.isDebugEnabled) - log.debug("Received gossip from [{}], containing [{}]", replyTo.path.address, updatedData.keys.mkString(", ")) + log.debug("Received gossip from [{}], containing [{}].", replyTo.path.address, updatedData.keys.mkString(", ")) var replyData = Map.empty[KeyId, DataEnvelope] updatedData.foreach { case (key, envelope) => @@ -2545,7 +2567,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog def waitReadRepairAck(envelope: Replicator.Internal.DataEnvelope): Receive = { case ReadRepairAck => val replyMsg = - if (envelope.data == DeletedData) DataDeleted(key, req) + if (envelope.data == DeletedData) GetDataDeleted(key, req) else GetSuccess(key, req)(envelope.data) replyTo.tell(replyMsg, context.parent) context.stop(self) diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorChaosSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorChaosSpec.scala index 851c432ed4..f143185386 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorChaosSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorChaosSpec.scala @@ -86,7 +86,7 @@ class ReplicatorChaosSpec extends MultiNodeSpec(ReplicatorChaosSpec) with STMult within(5.seconds) { awaitAssert { replicator ! Get(key, ReadLocal) - expectMsg(DataDeleted(key, None)) + expectMsg(GetDataDeleted(key, None)) } } diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala index 0675fe6e14..1fdb126124 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala @@ -4,14 +4,17 @@ package akka.cluster.ddata +import scala.concurrent.Await import scala.concurrent.duration._ +import akka.pattern.ask import akka.cluster.Cluster import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.remote.transport.ThrottlerTransportAdapter.Direction import akka.testkit._ +import akka.util.Timeout import com.typesafe.config.ConfigFactory object ReplicatorSpec extends MultiNodeConfig { @@ -74,6 +77,8 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec enterBarrier("after-" + afterCounter) } + private implicit val askTimeout: Timeout = 5.seconds + def join(from: RoleName, to: RoleName): Unit = { runOn(from) { cluster.join(node(to).address) @@ -144,13 +149,24 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec expectMsg(DeleteSuccess(KeyX, Some(777))) changedProbe.expectMsg(Deleted(KeyX)) replicator ! Get(KeyX, ReadLocal, Some(789)) - expectMsg(DataDeleted(KeyX, Some(789))) + expectMsg(GetDataDeleted(KeyX, Some(789))) replicator ! Get(KeyX, readAll, Some(456)) - expectMsg(DataDeleted(KeyX, Some(456))) + expectMsg(GetDataDeleted(KeyX, Some(456))) + // verify ask-mapTo for GetDataDeleted, issue #27371 + Await + .result((replicator ? Get(KeyX, ReadLocal, None)).mapTo[GetResponse[GCounter]], askTimeout.duration) + .getClass should be(classOf[GetDataDeleted[GCounter]]) + replicator ! Update(KeyX, GCounter(), WriteLocal, Some(123))(_ :+ 1) - expectMsg(DataDeleted(KeyX, Some(123))) + expectMsg(UpdateDataDeleted(KeyX, Some(123))) replicator ! Delete(KeyX, WriteLocal, Some(555)) expectMsg(DataDeleted(KeyX, Some(555))) + // verify ask-mapTo for UpdateDataDeleted, issue #27371 + Await + .result( + (replicator ? Update(KeyX, GCounter(), WriteLocal, Some(123))(_ :+ 1)).mapTo[UpdateResponse[GCounter]], + askTimeout.duration) + .getClass should be(classOf[UpdateDataDeleted[GCounter]]) replicator ! GetKeyIds expectMsg(GetKeyIdsResult(Set("A"))) @@ -309,7 +325,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec c.value should be(31) replicator ! Get(KeyY, ReadLocal, Some(777)) - expectMsg(DataDeleted(KeyY, Some(777))) + expectMsg(GetDataDeleted(KeyY, Some(777))) } } changedProbe.expectMsgPF() { case c @ Changed(KeyC) => c.get(KeyC).value } should be(31) diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index 8e31d180f2..658f052081 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -32,14 +32,14 @@ After being deprecated since 2.5.0, the following have been removed in Akka 2.6. - Use `akka.testkit.javadsl.TestKit` instead. * `UntypedPersistentActor` - Use `AbstractPersistentActor` instead. -* `UntypedPersistentActorWithAtLeastOnceDelivery` +* `UntypedPersistentActorWithAtLeastOnceDelivery` - Use @apidoc[AbstractPersistentActorWithAtLeastOnceDelivery] instead. After being deprecated since 2.2, the following have been removed in Akka 2.6. -* `actorFor` +* `actorFor` - Use `ActorSelection` instead. - + ### Removed methods * `Logging.getLogger(UntypedActor)` `UntypedActor` has been removed, use `AbstractActor` instead. @@ -338,6 +338,8 @@ This is described further in @ref:[inspecting sharding state](../cluster-shardin ### Distributed Data +#### Config for message payload size + Configuration properties for controlling sizes of `Gossip` and `DeltaPropagation` messages in Distributed Data have been reduced. Previous defaults sometimes resulted in messages exceeding max payload size for remote actor messages. @@ -349,6 +351,17 @@ akka.cluster.distributed-data.max-delta-elements = 500 akka.cluster.distributed-data.delta-crdt.max-delta-size = 50 ``` +#### DataDeleted + +`DataDeleted` has been changed in its usage. While it is still a possible response to a Delete request, +it is no longer the response when an `Update` or `Get` request couldn't be performed because the entry has been deleted. +In its place are two new possible responses to a request, `UpdateDataDeleted` for an `Update` and `GetDataDeleted` +for a `Get`. + +The reason for this change is that `DataDeleted` didn't extend the `UpdateResponse` and `GetResponse` types +and could therefore cause problems when `Update` and `Get` were used with `ask`. This was also a problem for +Akka Typed. + ### CoordinatedShutdown is run from ActorSystem.terminate No migration is needed but it is mentioned here because it is a change in behavior. @@ -379,7 +392,7 @@ down. It is no longer required to both check the materialized value and the `Try[Done]` inside the @apidoc[IOResult]. In case of an IO failure the exception will be @apidoc[IOOperationIncompleteException] instead of @apidoc[AbruptIOTerminationException]. -Additionally when downstream of the IO-sources cancels with a failure, the materialized value +Additionally when downstream of the IO-sources cancels with a failure, the materialized value is failed with that failure rather than completed successfully. ### Akka now uses Fork Join Pool from JDK @@ -424,7 +437,7 @@ The materialized value for `StreamRefs.sinkRef` and `StreamRefs.sourceRef` is no ### Naming convention changed In needing a way to distinguish the new APIs in code and docs from the original, Akka used the naming -convention `untyped`. All references of the original have now been changed to `classic`. The +convention `untyped`. All references of the original have now been changed to `classic`. The reference of the new APIs as `typed` is going away as it becomes the primary APIs. ### Receptionist has moved @@ -480,6 +493,9 @@ made before finalizing the APIs. Compared to Akka 2.5.x the source incompatible * Akka Typed is now using SLF4J as the logging API. @scala[`ActorContext.log`]@java[`ActorContext.getLog`] returns an `org.slf4j.Logger`. MDC has been changed to only support `String` values. * `setLoggerClass` in `ActorContext` has been renamed to `setLoggerName`. +* `GetDataDeleted` and `UpdateDataDeleted` introduced as described in @ref[DataDeleted](#datadeleted). +* `SubscribeResponse` introduced in `Subscribe` because the responses can be both `Changed` and `Deleted`. +* `ReplicationDeleteFailure` renamed to `DeleteFailure`. #### Akka Typed Stream API changes @@ -497,22 +513,22 @@ for Scala an implicit materializer is provided if there is an implicit `ActorSys materializers and simplifies most stream use cases somewhat. The `ActorMaterializer` factories has been deprecated and replaced with a few corresponding factories in `akka.stream.Materializer`. -New factories with per-materializer settings has not been provided but should instead be done globally through config or per stream, -see below for more details. +New factories with per-materializer settings has not been provided but should instead be done globally through config or per stream, +see below for more details. -Having a default materializer available means that most, if not all, usages of Java `ActorMaterializer.create()` -and Scala `implicit val materializer = ActorMaterializer()` should be removed. +Having a default materializer available means that most, if not all, usages of Java `ActorMaterializer.create()` +and Scala `implicit val materializer = ActorMaterializer()` should be removed. Details about the stream materializer can be found in [Actor Materializer Lifecycle](../stream/stream-flows-and-basics.md#actor-materializer-lifecycle) -When using streams from typed the same factories and methods for creating materializers and running streams as from classic can now be used with typed. The +When using streams from typed the same factories and methods for creating materializers and running streams as from classic can now be used with typed. The `akka.stream.typed.scaladsl.ActorMaterializer` and `akka.stream.typed.javadsl.ActorMaterializerFactory` that previously existed in the `akka-stream-typed` module has been removed. ### Materializer settings deprecated The `ActorMaterializerSettings` class has been deprecated. -All materializer settings are available as configuration to change the system default or through attributes that can be +All materializer settings are available as configuration to change the system default or through attributes that can be used for individual streams when they are materialized. | Materializer setting | Corresponding attribute | Setting | @@ -546,18 +562,18 @@ Java ### Stream cancellation available upstream -Previously an Akka streams stage or operator failed it was impossible to discern this from +Previously an Akka streams stage or operator failed it was impossible to discern this from the stage just cancelling. This has been improved so that when a stream stage fails the cause will be propagated upstream. The following operators have a slight change in behavior because of this: -* `FileIO.fromPath`, `FileIO.fromFile` and `StreamConverters.fromInputStream` will fail the materialized future with +* `FileIO.fromPath`, `FileIO.fromFile` and `StreamConverters.fromInputStream` will fail the materialized future with an `IOOperationIncompleteException` when downstream fails * `.watchTermination` will fail the materialized `Future` or `CompletionStage` rather than completing it when downstream fails -* `StreamRef` - `SourceRef` will cancel with a failure when the receiving node is downed +* `StreamRef` - `SourceRef` will cancel with a failure when the receiving node is downed This also means that custom `GraphStage` implementations should be changed to pass on the -cancellation cause when downstream cancels by implementing the `OutHandler.onDownstreamFinish` signature -taking a `cause` parameter and calling `cancelStage(cause)` to pass the cause upstream. The old zero-argument -`onDownstreamFinish` method has been deprecated. +cancellation cause when downstream cancels by implementing the `OutHandler.onDownstreamFinish` signature +taking a `cause` parameter and calling `cancelStage(cause)` to pass the cause upstream. The old zero-argument +`onDownstreamFinish` method has been deprecated.