From 65cba329d06396f08d1718056ada31c0f4afcb9c Mon Sep 17 00:00:00 2001 From: inakov Date: Sat, 14 Jan 2017 14:30:58 +0200 Subject: [PATCH] Updated DistributedData Delete API to include optional request context. #20140 --- .../scala/akka/cluster/ddata/Replicator.scala | 72 +++++++++++++------ .../cluster/ddata/ReplicatorChaosSpec.scala | 4 +- .../akka/cluster/ddata/ReplicatorSpec.scala | 30 ++++---- .../project/migration-guide-2.4.x-2.5.x.rst | 6 ++ akka-docs/rst/scala/distributed-data.rst | 6 +- project/MiMa.scala | 18 ++++- 6 files changed, 95 insertions(+), 41 deletions(-) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index e08f0f7bac..54b51b8054 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -318,7 +318,7 @@ object Replicator { * * The subscriber will automatically be unregistered if it is terminated. * - * If the key is deleted the subscriber is notified with a [[DataDeleted]] + * If the key is deleted the subscriber is notified with a [[Deleted]] * message. */ final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef) extends ReplicatorMessage @@ -349,6 +349,10 @@ object Replicator { def dataValue: A = data } + final case class Deleted[A <: ReplicatedData](key: Key[A]) extends NoSerializationVerificationNeeded { + override def toString: String = s"Deleted [$key]" + } + object Update { /** @@ -458,20 +462,40 @@ object Replicator { * It will eventually be disseminated to other replicas, unless the local replica * crashes before it has been able to communicate with other replicas. */ - final case class StoreFailure[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends UpdateFailure[A] with DeleteResponse[A] + final case class StoreFailure[A <: ReplicatedData](key: Key[A], request: Option[Any]) + extends UpdateFailure[A] with DeleteResponse[A] { + + /** Java API */ + override def getRequest: Optional[Any] = Optional.ofNullable(request.orNull) + } /** * Send this message to the local `Replicator` to delete a data value for the * given `key`. The `Replicator` will reply with one of the [[DeleteResponse]] messages. + * + * The optional `request` context is included in the reply messages. This is a convenient + * way to pass contextual information (e.g. original sender) without having to use `ask` + * or maintain local correlation data structures. */ - final case class Delete[A <: ReplicatedData](key: Key[A], consistency: WriteConsistency) extends Command[A] + final case class Delete[A <: ReplicatedData](key: Key[A], consistency: WriteConsistency, request: Option[Any] = None) + extends Command[A] with NoSerializationVerificationNeeded { - sealed trait DeleteResponse[A <: ReplicatedData] { - def key: Key[A] + def this(key: Key[A], consistency: WriteConsistency) = this(key, consistency, None) + + def this(key: Key[A], consistency: WriteConsistency, request: Optional[Any]) = + this(key, consistency, Option(request.orElse(null))) } - final case class DeleteSuccess[A <: ReplicatedData](key: Key[A]) extends DeleteResponse[A] - final case class ReplicationDeleteFailure[A <: ReplicatedData](key: Key[A]) extends DeleteResponse[A] - final case class DataDeleted[A <: ReplicatedData](key: Key[A]) + + sealed trait DeleteResponse[A <: ReplicatedData] extends NoSerializationVerificationNeeded { + def key: Key[A] + def request: Option[Any] + + /** Java API*/ + def getRequest: Optional[Any] = Optional.ofNullable(request.orNull) + } + final case class DeleteSuccess[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends DeleteResponse[A] + final case class ReplicationDeleteFailure[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends DeleteResponse[A] + final case class DataDeleted[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends RuntimeException with NoStackTrace with DeleteResponse[A] { override def toString: String = s"DataDeleted [$key]" } @@ -752,7 +776,11 @@ object Replicator { * A deleted key cannot be reused again, but it is still recommended to delete unused * data entries because that reduces the replication overhead when new nodes join the cluster. * Subsequent `Delete`, `Update` and `Get` requests will be replied with [[Replicator.DataDeleted]]. - * Subscribers will receive [[Replicator.DataDeleted]]. + * Subscribers will receive [[Replicator.Deleted]]. + * + * In the `Delete` message you can pass an optional request context in the same way as for the + * `Update` message, described above. For example the original sender can be passed and replied + * to after receiving and transforming `DeleteSuccess`. * * == CRDT Garbage == * @@ -951,7 +979,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog case LeaderChanged(leader) ⇒ receiveLeaderChanged(leader, None) case RoleLeaderChanged(role, leader) ⇒ receiveLeaderChanged(leader, Some(role)) case GetKeyIds ⇒ receiveGetKeyIds() - case Delete(key, consistency) ⇒ receiveDelete(key, consistency) + case Delete(key, consistency, req) ⇒ receiveDelete(key, consistency, req) case RemovedNodePruningTick ⇒ receiveRemovedNodePruningTick() case GetReplicaCount ⇒ receiveGetReplicaCount() } @@ -961,7 +989,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog log.debug("Received Get for key [{}], local data [{}]", key, localValue) if (isLocalGet(consistency)) { val reply = localValue match { - case Some(DataEnvelope(DeletedData, _)) ⇒ DataDeleted(key) + case Some(DataEnvelope(DeletedData, _)) ⇒ DataDeleted(key, req) case Some(DataEnvelope(data, _)) ⇒ GetSuccess(key, req)(data) case None ⇒ NotFound(key, req) } @@ -989,7 +1017,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog val localValue = getData(key.id) Try { localValue match { - case Some(DataEnvelope(DeletedData, _)) ⇒ throw new DataDeleted(key) + case Some(DataEnvelope(DeletedData, _)) ⇒ throw new DataDeleted(key, req) case Some(envelope @ DataEnvelope(existing, _)) ⇒ existing.merge(modify(Some(existing)).asInstanceOf[existing.T]) case None ⇒ modify(None) @@ -1082,27 +1110,27 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog sender() ! GetKeyIdsResult(keys) } - def receiveDelete(key: KeyR, consistency: WriteConsistency): Unit = { + def receiveDelete(key: KeyR, consistency: WriteConsistency, req: Option[Any]): Unit = { getData(key.id) match { case Some(DataEnvelope(DeletedData, _)) ⇒ // already deleted - sender() ! DataDeleted(key) + sender() ! DataDeleted(key, req) case _ ⇒ setData(key.id, DeletedEnvelope) val durable = isDurable(key.id) if (isLocalUpdate(consistency)) { if (durable) durableStore ! Store(key.id, DeletedData, - Some(StoreReply(DeleteSuccess(key), StoreFailure(key, None), sender()))) + Some(StoreReply(DeleteSuccess(key, req), StoreFailure(key, req), sender()))) else - sender() ! DeleteSuccess(key) + sender() ! DeleteSuccess(key, req) } else { val writeAggregator = - context.actorOf(WriteAggregator.props(key, DeletedEnvelope, consistency, None, nodes, unreachable, sender(), durable) + context.actorOf(WriteAggregator.props(key, DeletedEnvelope, consistency, req, nodes, unreachable, sender(), durable) .withDispatcher(context.props.dispatcher)) if (durable) { durableStore ! Store(key.id, DeletedData, - Some(StoreReply(DeleteSuccess(key), StoreFailure(key, None), writeAggregator))) + Some(StoreReply(DeleteSuccess(key, req), StoreFailure(key, req), writeAggregator))) } } } @@ -1147,7 +1175,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog val key = subscriptionKeys(keyId) getData(keyId) match { case Some(envelope) ⇒ - val msg = if (envelope.data == DeletedData) DataDeleted(key) else Changed(key)(envelope.data) + val msg = if (envelope.data == DeletedData) Deleted(key) else Changed(key)(envelope.data) subs.foreach { _ ! msg } case None ⇒ } @@ -1600,9 +1628,9 @@ private[akka] class WriteAggregator( val isTimeoutOrNotEnoughNodes = isTimeout || notEnoughNodes || gotWriteNackFrom.isEmpty val replyMsg = - if (isSuccess && isDelete) DeleteSuccess(key) + if (isSuccess && isDelete) DeleteSuccess(key, req) else if (isSuccess) UpdateSuccess(key, req) - else if (isTimeoutOrNotEnoughNodes && isDelete) ReplicationDeleteFailure(key) + else if (isTimeoutOrNotEnoughNodes && isDelete) ReplicationDeleteFailure(key, req) else if (isTimeoutOrNotEnoughNodes) UpdateTimeout(key, req) else StoreFailure(key, req) @@ -1702,7 +1730,7 @@ private[akka] class ReadAggregator( def waitReadRepairAck(envelope: Replicator.Internal.DataEnvelope): Receive = { case ReadRepairAck ⇒ val replyMsg = - if (envelope.data == DeletedData) DataDeleted(key) + if (envelope.data == DeletedData) DataDeleted(key, req) else GetSuccess(key, req)(envelope.data) replyTo.tell(replyMsg, context.parent) context.stop(self) diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorChaosSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorChaosSpec.scala index 517ce5d2aa..e4495b2b8a 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorChaosSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorChaosSpec.scala @@ -82,7 +82,7 @@ class ReplicatorChaosSpec extends MultiNodeSpec(ReplicatorChaosSpec) with STMult within(5.seconds) { awaitAssert { replicator ! Get(key, ReadLocal) - expectMsg(DataDeleted(key)) + expectMsg(DataDeleted(key, None)) } } @@ -141,7 +141,7 @@ class ReplicatorChaosSpec extends MultiNodeSpec(ReplicatorChaosSpec) with STMult replicator ! Update(KeyX, GCounter(), WriteTo(2, timeout))(_ + 50) expectMsg(UpdateSuccess(KeyX, None)) replicator ! Delete(KeyX, WriteLocal) - expectMsg(DeleteSuccess(KeyX)) + expectMsg(DeleteSuccess(KeyX, None)) } enterBarrier("initial-updates-done") diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala index e58cb02d23..e4bbca4c7d 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala @@ -136,17 +136,17 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec replicator ! Update(KeyX, GCounter(), WriteLocal)(_ + 9) expectMsg(UpdateSuccess(KeyX, None)) changedProbe.expectMsg(Changed(KeyX)(c9)).dataValue should be(c9) - replicator ! Delete(KeyX, WriteLocal) - expectMsg(DeleteSuccess(KeyX)) - changedProbe.expectMsg(DataDeleted(KeyX)) - replicator ! Get(KeyX, ReadLocal) - expectMsg(DataDeleted(KeyX)) - replicator ! Get(KeyX, readAll) - expectMsg(DataDeleted(KeyX)) - replicator ! Update(KeyX, GCounter(), WriteLocal)(_ + 1) - expectMsg(DataDeleted(KeyX)) - replicator ! Delete(KeyX, WriteLocal) - expectMsg(DataDeleted(KeyX)) + replicator ! Delete(KeyX, WriteLocal, Some(777)) + expectMsg(DeleteSuccess(KeyX, Some(777))) + changedProbe.expectMsg(Deleted(KeyX)) + replicator ! Get(KeyX, ReadLocal, Some(789)) + expectMsg(DataDeleted(KeyX, Some(789))) + replicator ! Get(KeyX, readAll, Some(456)) + expectMsg(DataDeleted(KeyX, Some(456))) + replicator ! Update(KeyX, GCounter(), WriteLocal, Some(123))(_ + 1) + expectMsg(DataDeleted(KeyX, Some(123))) + replicator ! Delete(KeyX, WriteLocal, Some(555)) + expectMsg(DataDeleted(KeyX, Some(555))) replicator ! GetKeyIds expectMsg(GetKeyIdsResult(Set("A"))) @@ -288,8 +288,8 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec expectMsg(UpdateSuccess(KeyC, None)) changedProbe.expectMsgPF() { case c @ Changed(KeyC) ⇒ c.get(KeyC).value } should be(31) - replicator ! Delete(KeyY, WriteLocal) - expectMsg(DeleteSuccess(KeyY)) + replicator ! Delete(KeyY, WriteLocal, Some(777)) + expectMsg(DeleteSuccess(KeyY, Some(777))) replicator ! Get(KeyZ, readMajority) expectMsgPF() { case g @ GetSuccess(KeyZ, _) ⇒ g.get(KeyZ).value } should be(30) @@ -304,8 +304,8 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec val c = expectMsgPF() { case g @ GetSuccess(KeyC, _) ⇒ g.get(KeyC) } c.value should be(31) - replicator ! Get(KeyY, ReadLocal) - expectMsg(DataDeleted(KeyY)) + replicator ! Get(KeyY, ReadLocal, Some(777)) + expectMsg(DataDeleted(KeyY, Some(777))) } } changedProbe.expectMsgPF() { case c @ Changed(KeyC) ⇒ c.get(KeyC).value } should be(31) diff --git a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst index 8e17736b39..b257933a1d 100644 --- a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst +++ b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst @@ -184,3 +184,9 @@ Java developers should use `<>` instead of `[]`, e.g: `PNCounterMap`. **NOTE: Even though the interface is not compatible between 2.4 and 2.5, the binary protocol over the wire is (as long as you use String as key type). This means that 2.4 nodes can synchronize with 2.5 nodes.** + +Subscribers +----------- + +When an entity is removed subscribers will not receive ``Replicator.DataDeleted`` any more. +They will receive ``Replicator.Deleted`` instead. diff --git a/akka-docs/rst/scala/distributed-data.rst b/akka-docs/rst/scala/distributed-data.rst index 20235d05d5..ae217f3e88 100644 --- a/akka-docs/rst/scala/distributed-data.rst +++ b/akka-docs/rst/scala/distributed-data.rst @@ -244,7 +244,11 @@ to all nodes. A deleted key cannot be reused again, but it is still recommended to delete unused data entries because that reduces the replication overhead when new nodes join the cluster. Subsequent ``Delete``, ``Update`` and ``Get`` requests will be replied with ``Replicator.DataDeleted``. -Subscribers will receive ``Replicator.DataDeleted``. +Subscribers will receive ``Replicator.Deleted``. + +In the `Delete` message you can pass an optional request context in the same way as for the +`Update` message, described above. For example the original sender can be passed and replied +to after receiving and transforming `DeleteSuccess`. .. includecode:: code/docs/ddata/DistributedDataDocSpec.scala#delete diff --git a/project/MiMa.scala b/project/MiMa.scala index 014e78dad4..6e8b506c3d 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -163,8 +163,24 @@ object MiMa extends AutoPlugin { FilterAnyProblemStartingWith("akka.cluster.ddata.ORMap"), FilterAnyProblemStartingWith("akka.cluster.ddata.LWWMap"), FilterAnyProblemStartingWith("akka.cluster.ddata.PNCounterMap"), - FilterAnyProblemStartingWith("akka.cluster.ddata.ORMultiMap") + FilterAnyProblemStartingWith("akka.cluster.ddata.ORMultiMap"), + // #20140 durable distributed data + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#ReplicationDeleteFailure.apply"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#DeleteSuccess.apply"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.Replicator#DeleteResponse.getRequest"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.Replicator#DeleteResponse.request"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.Replicator#Command.request"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator.receiveDelete"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#ReplicationDeleteFailure.copy"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#ReplicationDeleteFailure.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#DeleteSuccess.copy"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#DeleteSuccess.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#Delete.apply"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#DataDeleted.apply"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#DataDeleted.copy"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#DataDeleted.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#Delete.copy") ) Map(