API changes related to DataDeleted, #27371

* The reason for this change is that `DataDeleted` didn't extend the
  `UpdateResponse` and `GetResponse` types and could therefore cause problems
  when `Update` and `Get` were used with `ask`. This was also a problem for
  Akka Typed.
* Introduce new messages types UpdateDataDeleted and GetDataDeleted
* Introduce SubscribeResponse because the responses can be both `Changed`
  and `Deleted` are responses to subscriptions. Important for Typed.
This commit is contained in:
Patrik Nordwall 2019-08-16 07:33:41 +02:00
parent 4576835cce
commit 1d09d2725b
11 changed files with 228 additions and 97 deletions

View file

@ -25,9 +25,9 @@ import akka.actor.typed.Terminated
import akka.cluster.ddata.typed.javadsl.{ Replicator => JReplicator }
import akka.cluster.ddata.typed.scaladsl.{ Replicator => SReplicator }
private case class InternalChanged[A <: ReplicatedData](
chg: dd.Replicator.Changed[A],
subscriber: ActorRef[JReplicator.Changed[A]])
private case class InternalSubscribeResponse[A <: ReplicatedData](
chg: dd.Replicator.SubscribeResponse[A],
subscriber: ActorRef[JReplicator.SubscribeResponse[A]])
extends JReplicator.Command
val localAskTimeout = 60.seconds // ReadLocal, WriteLocal shouldn't timeout
@ -82,8 +82,9 @@ import akka.actor.typed.Terminated
.map {
case rsp: dd.Replicator.GetSuccess[d] =>
JReplicator.GetSuccess(rsp.key)(rsp.dataValue)
case rsp: dd.Replicator.NotFound[d] => JReplicator.NotFound(rsp.key)
case rsp: dd.Replicator.GetFailure[d] => JReplicator.GetFailure(rsp.key)
case rsp: dd.Replicator.NotFound[d] => JReplicator.NotFound(rsp.key)
case rsp: dd.Replicator.GetFailure[d] => JReplicator.GetFailure(rsp.key)
case rsp: dd.Replicator.GetDataDeleted[d] => JReplicator.GetDataDeleted(rsp.key)
}
.recover {
case _ => JReplicator.GetFailure(cmd.key)
@ -111,7 +112,8 @@ import akka.actor.typed.Terminated
case rsp: dd.Replicator.UpdateTimeout[d] => JReplicator.UpdateTimeout(rsp.key)
case rsp: dd.Replicator.ModifyFailure[d] =>
JReplicator.ModifyFailure(rsp.key, rsp.errorMessage, rsp.cause)
case rsp: dd.Replicator.StoreFailure[d] => JReplicator.StoreFailure(rsp.key)
case rsp: dd.Replicator.StoreFailure[d] => JReplicator.StoreFailure(rsp.key)
case rsp: dd.Replicator.UpdateDataDeleted[d] => JReplicator.UpdateDataDeleted(rsp.key)
}
.recover {
case _ => JReplicator.UpdateTimeout(cmd.key)
@ -130,8 +132,9 @@ import akka.actor.typed.Terminated
// For the Java API the Changed messages must be mapped to the JReplicator.Changed class.
// That is done with an adapter, and we have to keep track of the lifecycle of the original
// subscriber and stop the adapter when the original subscriber is stopped.
val adapter: ActorRef[dd.Replicator.Changed[ReplicatedData]] = ctx.spawnMessageAdapter { chg =>
InternalChanged(chg, cmd.subscriber)
val adapter: ActorRef[dd.Replicator.SubscribeResponse[ReplicatedData]] = ctx.spawnMessageAdapter {
rsp =>
InternalSubscribeResponse(rsp, cmd.subscriber)
}
classicReplicator.tell(
@ -142,8 +145,11 @@ import akka.actor.typed.Terminated
withState(subscribeAdapters.updated(cmd.subscriber, adapter))
case InternalChanged(chg, subscriber) =>
subscriber ! JReplicator.Changed(chg.key)(chg.dataValue)
case InternalSubscribeResponse(rsp, subscriber) =>
rsp match {
case chg: dd.Replicator.Changed[_] => subscriber ! JReplicator.Changed(chg.key)(chg.dataValue)
case del: dd.Replicator.Deleted[_] => subscriber ! JReplicator.Deleted(del.key)
}
Behaviors.same
case cmd: JReplicator.Unsubscribe[ReplicatedData] @unchecked =>
@ -165,12 +171,12 @@ import akka.actor.typed.Terminated
.map {
case rsp: dd.Replicator.DeleteSuccess[d] => JReplicator.DeleteSuccess(rsp.key)
case rsp: dd.Replicator.ReplicationDeleteFailure[d] =>
JReplicator.ReplicationDeleteFailure(rsp.key)
JReplicator.DeleteFailure(rsp.key)
case rsp: dd.Replicator.DataDeleted[d] => JReplicator.DataDeleted(rsp.key)
case rsp: dd.Replicator.StoreFailure[d] => JReplicator.StoreFailure(rsp.key)
}
.recover {
case _ => JReplicator.ReplicationDeleteFailure(cmd.key)
case _ => JReplicator.DeleteFailure(cmd.key)
}
reply.foreach { cmd.replyTo ! _ }
Behaviors.same

View file

@ -7,8 +7,6 @@ package akka.cluster.ddata.typed.javadsl
import java.time.Duration
import java.util.function.{ Function => JFunction }
import scala.util.control.NoStackTrace
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.DeadLetterSuppression
@ -152,6 +150,11 @@ object Replicator {
*/
final case class GetFailure[A <: ReplicatedData](key: Key[A]) extends GetResponse[A]
/**
* The [[Get]] request couldn't be performed because the entry has been deleted.
*/
final case class GetDataDeleted[A <: ReplicatedData](key: Key[A]) extends GetResponse[A]
object Update {
private def modifyWithInitial[A <: ReplicatedData](initial: A, modify: A => A): Option[A] => A = {
@ -215,6 +218,11 @@ object Replicator {
*/
final case class UpdateTimeout[A <: ReplicatedData](key: Key[A]) extends UpdateFailure[A]
/**
* The [[Update]] couldn't be performed because the entry has been deleted.
*/
final case class UpdateDataDeleted[A <: ReplicatedData](key: Key[A]) extends UpdateResponse[A]
/**
* If the `modify` function of the [[Update]] throws an exception the reply message
* will be this `ModifyFailure` message. The original exception is included as `cause`.
@ -250,21 +258,30 @@ object Replicator {
* If the key is deleted the subscriber is notified with a [[Deleted]]
* message.
*/
final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command
final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[SubscribeResponse[A]])
extends Command
/**
* Unregister a subscriber.
*
* @see [[Replicator.Subscribe]]
*/
final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command
final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[SubscribeResponse[A]])
extends Command
/**
* @see [[Replicator.Subscribe]]
*/
sealed trait SubscribeResponse[A <: ReplicatedData] extends NoSerializationVerificationNeeded {
def key: Key[A]
}
/**
* The data value is retrieved with [[#get]] using the typed key.
*
* @see [[Replicator.Subscribe]]
*/
final case class Changed[A <: ReplicatedData](key: Key[A])(data: A) {
final case class Changed[A <: ReplicatedData](key: Key[A])(data: A) extends SubscribeResponse[A] {
/**
* The data value, with correct type.
@ -281,6 +298,11 @@ object Replicator {
def dataValue: A = data
}
/**
* @see [[Replicator.Subscribe]]
*/
final case class Deleted[A <: ReplicatedData](key: Key[A]) extends SubscribeResponse[A]
/**
* Send this message to the local `Replicator` to delete a data value for the
* given `key`. The `Replicator` will reply with one of the [[DeleteResponse]] messages.
@ -296,13 +318,8 @@ object Replicator {
def key: Key[A]
}
final case class DeleteSuccess[A <: ReplicatedData](key: Key[A]) extends DeleteResponse[A]
final case class ReplicationDeleteFailure[A <: ReplicatedData](key: Key[A]) extends DeleteResponse[A]
final case class DataDeleted[A <: ReplicatedData](key: Key[A])
extends RuntimeException
with NoStackTrace
with DeleteResponse[A] {
override def toString: String = s"DataDeleted [$key]"
}
final case class DeleteFailure[A <: ReplicatedData](key: Key[A]) extends DeleteResponse[A]
final case class DataDeleted[A <: ReplicatedData](key: Key[A]) extends DeleteResponse[A]
/**
* Get current number of replicas, including the local replica.

View file

@ -57,21 +57,21 @@ class ReplicatorMessageAdapter[A, B <: ReplicatedData](
private implicit val askTimeout: Timeout = Timeout(unexpectedAskTimeout.asScala)
private var changedMessageAdapters: Map[Key[B], ActorRef[Replicator.Changed[B]]] = Map.empty
private var changedMessageAdapters: Map[Key[B], ActorRef[Replicator.SubscribeResponse[B]]] = Map.empty
/**
* Subscribe to changes of the given `key`. The [[Replicator.Changed]] messages from
* Subscribe to changes of the given `key`. The [[Replicator.Changed]] and [[Replicator.Deleted]] messages from
* the replicator are transformed to the message protocol of the requesting actor with
* the given `responseAdapter` function.
*/
def subscribe(key: Key[B], responseAdapter: JFunction[Replicator.Changed[B], A]): Unit = {
def subscribe(key: Key[B], responseAdapter: JFunction[Replicator.SubscribeResponse[B], A]): Unit = {
// unsubscribe in case it's called more than once per key
unsubscribe(key)
changedMessageAdapters.get(key).foreach { subscriber =>
replicator ! Replicator.Unsubscribe(key, subscriber)
}
val replyTo: ActorRef[Replicator.Changed[B]] =
context.messageAdapter(classOf[Replicator.Changed[B]], responseAdapter)
val replyTo: ActorRef[Replicator.SubscribeResponse[B]] =
context.messageAdapter(classOf[Replicator.SubscribeResponse[B]], responseAdapter)
changedMessageAdapters = changedMessageAdapters.updated(key, replyTo)
replicator ! Replicator.Subscribe(key, replyTo)
}

View file

@ -86,6 +86,15 @@ object Replicator {
def unapply[A <: ReplicatedData](rsp: GetFailure[A]): Option[Key[A]] = Some(rsp.key)
}
/**
* The [[Get]] request couldn't be performed because the entry has been deleted.
*/
type GetDataDeleted[A <: ReplicatedData] = dd.Replicator.GetDataDeleted[A]
object GetDataDeleted {
def unapply[A <: ReplicatedData](rsp: GetDataDeleted[A]): Option[Key[A]] =
Some(rsp.key)
}
object Update {
/**
@ -165,6 +174,15 @@ object Replicator {
Some(rsp.key)
}
/**
* The [[Update]] couldn't be performed because the entry has been deleted.
*/
type UpdateDataDeleted[A <: ReplicatedData] = dd.Replicator.UpdateDataDeleted[A]
object UpdateDataDeleted {
def unapply[A <: ReplicatedData](rsp: UpdateDataDeleted[A]): Option[Key[A]] =
Some(rsp.key)
}
/**
* If the `modify` function of the [[Update]] throws an exception the reply message
* will be this `ModifyFailure` message. The original exception is included as `cause`.
@ -214,6 +232,16 @@ object Replicator {
*/
final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command
/**
* @see [[Replicator.Subscribe]]
*/
type SubscribeResponse[A <: ReplicatedData] = dd.Replicator.SubscribeResponse[A]
/**
* The data value is retrieved with [[#get]] using the typed key.
*
* @see [[Replicator.Subscribe]]
*/
object Changed {
def unapply[A <: ReplicatedData](chg: Changed[A]): Option[Key[A]] = Some(chg.key)
}
@ -225,6 +253,15 @@ object Replicator {
*/
type Changed[A <: ReplicatedData] = dd.Replicator.Changed[A]
object Deleted {
def unapply[A <: ReplicatedData](del: Deleted[A]): Option[Key[A]] = Some(del.key)
}
/**
* @see [[Replicator.Subscribe]]
*/
type Deleted[A <: ReplicatedData] = dd.Replicator.Deleted[A]
object Delete {
/**
@ -253,9 +290,9 @@ object Replicator {
def unapply[A <: ReplicatedData](rsp: DeleteSuccess[A]): Option[Key[A]] =
Some(rsp.key)
}
type ReplicationDeleteFailure[A <: ReplicatedData] = dd.Replicator.ReplicationDeleteFailure[A]
object ReplicationDeleteFailure {
def unapply[A <: ReplicatedData](rsp: ReplicationDeleteFailure[A]): Option[Key[A]] =
type DeleteFailure[A <: ReplicatedData] = dd.Replicator.ReplicationDeleteFailure[A]
object DeleteFailure {
def unapply[A <: ReplicatedData](rsp: DeleteFailure[A]): Option[Key[A]] =
Some(rsp.key)
}
type DataDeleted[A <: ReplicatedData] = dd.Replicator.DataDeleted[A]

View file

@ -61,20 +61,21 @@ class ReplicatorMessageAdapter[A, B <: ReplicatedData](
private implicit val askTimeout: Timeout = Timeout(unexpectedAskTimeout)
private var changedMessageAdapters: Map[Key[B], ActorRef[Replicator.Changed[B]]] = Map.empty
private var changedMessageAdapters: Map[Key[B], ActorRef[Replicator.SubscribeResponse[B]]] = Map.empty
/**
* Subscribe to changes of the given `key`. The [[Replicator.Changed]] messages from
* Subscribe to changes of the given `key`. The [[Replicator.Changed]] and [[Replicator.Deleted]] messages from
* the replicator are transformed to the message protocol of the requesting actor with
* the given `responseAdapter` function.
*/
def subscribe(key: Key[B], responseAdapter: Replicator.Changed[B] => A): Unit = {
def subscribe(key: Key[B], responseAdapter: Replicator.SubscribeResponse[B] => A): Unit = {
// unsubscribe in case it's called more than once per key
unsubscribe(key)
changedMessageAdapters.get(key).foreach { subscriber =>
replicator ! Replicator.Unsubscribe(key, subscriber)
}
val replyTo: ActorRef[Replicator.Changed[B]] = context.messageAdapter[Replicator.Changed[B]](responseAdapter)
val replyTo: ActorRef[Replicator.SubscribeResponse[B]] =
context.messageAdapter[Replicator.SubscribeResponse[B]](responseAdapter)
changedMessageAdapters = changedMessageAdapters.updated(key, replyTo)
replicator ! Replicator.Subscribe(key, replyTo)
}

View file

@ -74,11 +74,11 @@ public class ReplicatorTest extends JUnitSuite {
}
}
private static final class InternalChanged implements InternalMsg {
final Replicator.Changed<GCounter> chg;
private static final class InternalSubscribeResponse implements InternalMsg {
final Replicator.SubscribeResponse<GCounter> rsp;
InternalChanged(Replicator.Changed<GCounter> chg) {
this.chg = chg;
InternalSubscribeResponse(Replicator.SubscribeResponse<GCounter> rsp) {
this.rsp = rsp;
}
}
@ -102,7 +102,7 @@ public class ReplicatorTest extends JUnitSuite {
node = DistributedData.get(ctx.getSystem()).selfUniqueAddress();
this.replicatorAdapter.subscribe(this.key, InternalChanged::new);
this.replicatorAdapter.subscribe(this.key, InternalSubscribeResponse::new);
}
public static Behavior<ClientCommand> create(Key<GCounter> key) {
@ -121,7 +121,7 @@ public class ReplicatorTest extends JUnitSuite {
.onMessage(GetValue.class, this::onGetValue)
.onMessage(GetCachedValue.class, this::onGetCachedValue)
.onMessage(InternalGetResponse.class, this::onInternalGetResponse)
.onMessage(InternalChanged.class, this::onInternalChanged)
.onMessage(InternalSubscribeResponse.class, this::onInternalSubscribeResponse)
.build();
}
@ -136,7 +136,7 @@ public class ReplicatorTest extends JUnitSuite {
curr -> curr.increment(node, 1)),
InternalUpdateResponse::new);
return Behaviors.same();
return this;
}
private Behavior<ClientCommand> onGetValue(GetValue cmd) {
@ -144,29 +144,34 @@ public class ReplicatorTest extends JUnitSuite {
askReplyTo -> new Replicator.Get<>(key, Replicator.readLocal(), askReplyTo),
rsp -> new InternalGetResponse(rsp, cmd.replyTo));
return Behaviors.same();
return this;
}
private Behavior<ClientCommand> onGetCachedValue(GetCachedValue cmd) {
cmd.replyTo.tell(cachedValue);
return Behaviors.same();
return this;
}
private Behavior<ClientCommand> onInternalGetResponse(InternalGetResponse msg) {
if (msg.rsp instanceof Replicator.GetSuccess) {
int value = ((Replicator.GetSuccess<?>) msg.rsp).get(key).getValue().intValue();
msg.replyTo.tell(value);
return Behaviors.same();
return this;
} else {
// not dealing with failures
return Behaviors.unhandled();
}
}
private Behavior<ClientCommand> onInternalChanged(InternalChanged msg) {
GCounter counter = msg.chg.get(key);
cachedValue = counter.getValue().intValue();
return this;
private Behavior<ClientCommand> onInternalSubscribeResponse(InternalSubscribeResponse msg) {
if (msg.rsp instanceof Replicator.Changed) {
GCounter counter = ((Replicator.Changed<?>) msg.rsp).get(key);
cachedValue = counter.getValue().intValue();
return this;
} else {
// no deletes
return Behaviors.unhandled();
}
}
}

View file

@ -42,7 +42,7 @@ object ReplicatorSpec {
private case class InternalUpdateResponse(rsp: Replicator.UpdateResponse[GCounter]) extends InternalMsg
private case class InternalGetResponse(rsp: Replicator.GetResponse[GCounter], replyTo: ActorRef[Int])
extends InternalMsg
private case class InternalChanged(chg: Replicator.Changed[GCounter]) extends InternalMsg
private case class InternalSubscribeResponse(chg: Replicator.SubscribeResponse[GCounter]) extends InternalMsg
def client(key: GCounterKey): Behavior[ClientCommand] =
Behaviors.setup[ClientCommand] { ctx =>
@ -50,7 +50,7 @@ object ReplicatorSpec {
// adapter that turns the response messages from the replicator into our own protocol
DistributedData.withReplicatorMessageAdapter[ClientCommand, GCounter] { replicatorAdapter =>
replicatorAdapter.subscribe(key, InternalChanged.apply)
replicatorAdapter.subscribe(key, InternalSubscribeResponse.apply)
def behavior(cachedValue: Int): Behavior[ClientCommand] = {
Behaviors.receiveMessage[ClientCommand] {
@ -84,9 +84,12 @@ object ReplicatorSpec {
case InternalGetResponse(_, _) =>
Behaviors.unhandled // not dealing with failures
case InternalChanged(chg @ Replicator.Changed(`key`)) =>
case InternalSubscribeResponse(chg @ Replicator.Changed(`key`)) =>
val value = chg.get(key).value.intValue
behavior(value)
case InternalSubscribeResponse(Replicator.Deleted(_)) =>
Behaviors.unhandled // no deletes
}
}
}
@ -155,9 +158,10 @@ object ReplicatorSpec {
val key = GCounterKey("counter")
getResponse match {
case GetSuccess(`key`) =>
case GetFailure(`key`) =>
case NotFound(`key`) =>
case GetSuccess(`key`) =>
case GetFailure(`key`) =>
case NotFound(`key`) =>
case GetDataDeleted(`key`) =>
}
val updateResponse: UpdateResponse[GCounter] = ???
@ -167,13 +171,20 @@ object ReplicatorSpec {
case UpdateTimeout(`key`) =>
case StoreFailure(`key`) =>
case UpdateFailure(`key`) =>
case UpdateDataDeleted(`key`) =>
}
val deleteResponse: DeleteResponse[GCounter] = ???
deleteResponse match {
case DeleteSuccess(`key`) =>
case ReplicationDeleteFailure(`key`) =>
case DataDeleted(`key`) =>
case DeleteSuccess(`key`) =>
case DeleteFailure(`key`) =>
case DataDeleted(`key`) =>
}
val subscribeResponse: SubscribeResponse[GCounter] = ???
subscribeResponse match {
case Changed(`key`) =>
case Deleted(`key`) =>
}
val replicaCount: ReplicaCount = ???

View file

@ -540,6 +540,11 @@ object Replicator {
extends GetResponse[A]
with ReplicatorMessage
/**
* The [[Get]] request couldn't be performed because the entry has been deleted.
*/
final case class GetDataDeleted[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends GetResponse[A]
/**
* Register a subscriber that will be notified with a [[Changed]] message
* when the value of the given `key` is changed. Current value is also
@ -563,12 +568,21 @@ object Replicator {
*/
final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef) extends ReplicatorMessage
/**
* @see [[Replicator.Subscribe]]
*/
sealed trait SubscribeResponse[A <: ReplicatedData] extends NoSerializationVerificationNeeded {
def key: Key[A]
}
/**
* The data value is retrieved with [[#get]] using the typed key.
*
* @see [[Replicator.Subscribe]]
*/
final case class Changed[A <: ReplicatedData](key: Key[A])(data: A) extends ReplicatorMessage {
final case class Changed[A <: ReplicatedData](key: Key[A])(data: A)
extends SubscribeResponse[A]
with ReplicatorMessage {
/**
* The data value, with correct type.
@ -585,9 +599,10 @@ object Replicator {
def dataValue: A = data
}
final case class Deleted[A <: ReplicatedData](key: Key[A]) extends NoSerializationVerificationNeeded {
override def toString: String = s"Deleted [$key]"
}
/**
* @see [[Replicator.Subscribe]]
*/
final case class Deleted[A <: ReplicatedData](key: Key[A]) extends SubscribeResponse[A]
object Update {
@ -691,6 +706,11 @@ object Replicator {
*/
final case class UpdateTimeout[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends UpdateFailure[A]
/**
* The [[Update]] couldn't be performed because the entry has been deleted.
*/
final case class UpdateDataDeleted[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends UpdateResponse[A]
/**
* If the `modify` function of the [[Update]] throws an exception the reply message
* will be this `ModifyFailure` message. The original exception is included as `cause`.
@ -1172,7 +1192,8 @@ object Replicator {
*
* A deleted key cannot be reused again, but it is still recommended to delete unused
* data entries because that reduces the replication overhead when new nodes join the cluster.
* Subsequent `Delete`, `Update` and `Get` requests will be replied with [[Replicator.DataDeleted]].
* Subsequent `Delete`, `Update` and `Get` requests will be replied with [[Replicator.DataDeleted]],
* [[Replicator.UpdateDataDeleted]] and [[Replicator.GetDataDeleted]] respectively.
* Subscribers will receive [[Replicator.Deleted]].
*
* In the `Delete` message you can pass an optional request context in the same way as for the
@ -1515,10 +1536,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def receiveGet(key: KeyR, consistency: ReadConsistency, req: Option[Any]): Unit = {
val localValue = getData(key.id)
log.debug("Received Get for key [{}]", key)
log.debug("Received Get for key [{}].", key)
if (isLocalGet(consistency)) {
val reply = localValue match {
case Some(DataEnvelope(DeletedData, _, _)) => DataDeleted(key, req)
case Some(DataEnvelope(DeletedData, _, _)) => GetDataDeleted(key, req)
case Some(DataEnvelope(data, _, _)) => GetSuccess(key, req)(data)
case None => NotFound(key, req)
}
@ -1559,7 +1580,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
Try {
localValue match {
case Some(DataEnvelope(DeletedData, _, _)) => throw new DataDeleted(key, req)
case Some(envelope @ DataEnvelope(DeletedData, _, _)) =>
(envelope, None)
case Some(envelope @ DataEnvelope(existing, _, _)) =>
modify(Some(existing)) match {
case d: DeltaReplicatedData if deltaCrdtEnabled =>
@ -1575,8 +1597,11 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
}
} match {
case Success((DataEnvelope(DeletedData, _, _), _)) =>
log.debug("Received Update for deleted key [{}].", key)
replyTo ! UpdateDataDeleted(key, req)
case Success((envelope, delta)) =>
log.debug("Received Update for key [{}]", key)
log.debug("Received Update for key [{}].", key)
// handle the delta
delta match {
@ -1628,9 +1653,6 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), writeAggregator)))
}
}
case Failure(e: DataDeleted[_]) =>
log.debug("Received Update for deleted key [{}]", key)
replyTo ! e
case Failure(e) =>
log.debug("Received Update for key [{}], failed: {}", key, e.getMessage)
replyTo ! ModifyFailure(key, "Update failed: " + e.getMessage, e, req)
@ -1859,7 +1881,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val isDebugEnabled = log.isDebugEnabled
if (isDebugEnabled)
log.debug(
"Received DeltaPropagation from [{}], containing [{}]",
"Received DeltaPropagation from [{}], containing [{}].",
fromNode.address,
deltas.collect { case (key, Delta(_, fromSeqNr, toSeqNr)) => s"$key $fromSeqNr-$toSeqNr" }.mkString(", "))
@ -1960,7 +1982,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def receiveStatus(otherDigests: Map[KeyId, Digest], chunk: Int, totChunks: Int, fromSystemUid: Option[Long]): Unit = {
if (log.isDebugEnabled)
log.debug(
"Received gossip status from [{}], chunk [{}] of [{}] containing [{}]",
"Received gossip status from [{}], chunk [{}] of [{}] containing [{}].",
replyTo.path.address,
(chunk + 1),
totChunks,
@ -2008,7 +2030,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def receiveGossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean, fromSystemUid: Option[Long]): Unit = {
if (log.isDebugEnabled)
log.debug("Received gossip from [{}], containing [{}]", replyTo.path.address, updatedData.keys.mkString(", "))
log.debug("Received gossip from [{}], containing [{}].", replyTo.path.address, updatedData.keys.mkString(", "))
var replyData = Map.empty[KeyId, DataEnvelope]
updatedData.foreach {
case (key, envelope) =>
@ -2545,7 +2567,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def waitReadRepairAck(envelope: Replicator.Internal.DataEnvelope): Receive = {
case ReadRepairAck =>
val replyMsg =
if (envelope.data == DeletedData) DataDeleted(key, req)
if (envelope.data == DeletedData) GetDataDeleted(key, req)
else GetSuccess(key, req)(envelope.data)
replyTo.tell(replyMsg, context.parent)
context.stop(self)

View file

@ -86,7 +86,7 @@ class ReplicatorChaosSpec extends MultiNodeSpec(ReplicatorChaosSpec) with STMult
within(5.seconds) {
awaitAssert {
replicator ! Get(key, ReadLocal)
expectMsg(DataDeleted(key, None))
expectMsg(GetDataDeleted(key, None))
}
}

View file

@ -4,14 +4,17 @@
package akka.cluster.ddata
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.pattern.ask
import akka.cluster.Cluster
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.testkit._
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
object ReplicatorSpec extends MultiNodeConfig {
@ -74,6 +77,8 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
enterBarrier("after-" + afterCounter)
}
private implicit val askTimeout: Timeout = 5.seconds
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster.join(node(to).address)
@ -144,13 +149,24 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
expectMsg(DeleteSuccess(KeyX, Some(777)))
changedProbe.expectMsg(Deleted(KeyX))
replicator ! Get(KeyX, ReadLocal, Some(789))
expectMsg(DataDeleted(KeyX, Some(789)))
expectMsg(GetDataDeleted(KeyX, Some(789)))
replicator ! Get(KeyX, readAll, Some(456))
expectMsg(DataDeleted(KeyX, Some(456)))
expectMsg(GetDataDeleted(KeyX, Some(456)))
// verify ask-mapTo for GetDataDeleted, issue #27371
Await
.result((replicator ? Get(KeyX, ReadLocal, None)).mapTo[GetResponse[GCounter]], askTimeout.duration)
.getClass should be(classOf[GetDataDeleted[GCounter]])
replicator ! Update(KeyX, GCounter(), WriteLocal, Some(123))(_ :+ 1)
expectMsg(DataDeleted(KeyX, Some(123)))
expectMsg(UpdateDataDeleted(KeyX, Some(123)))
replicator ! Delete(KeyX, WriteLocal, Some(555))
expectMsg(DataDeleted(KeyX, Some(555)))
// verify ask-mapTo for UpdateDataDeleted, issue #27371
Await
.result(
(replicator ? Update(KeyX, GCounter(), WriteLocal, Some(123))(_ :+ 1)).mapTo[UpdateResponse[GCounter]],
askTimeout.duration)
.getClass should be(classOf[UpdateDataDeleted[GCounter]])
replicator ! GetKeyIds
expectMsg(GetKeyIdsResult(Set("A")))
@ -309,7 +325,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
c.value should be(31)
replicator ! Get(KeyY, ReadLocal, Some(777))
expectMsg(DataDeleted(KeyY, Some(777)))
expectMsg(GetDataDeleted(KeyY, Some(777)))
}
}
changedProbe.expectMsgPF() { case c @ Changed(KeyC) => c.get(KeyC).value } should be(31)

View file

@ -32,14 +32,14 @@ After being deprecated since 2.5.0, the following have been removed in Akka 2.6.
- Use `akka.testkit.javadsl.TestKit` instead.
* `UntypedPersistentActor`
- Use `AbstractPersistentActor` instead.
* `UntypedPersistentActorWithAtLeastOnceDelivery`
* `UntypedPersistentActorWithAtLeastOnceDelivery`
- Use @apidoc[AbstractPersistentActorWithAtLeastOnceDelivery] instead.
After being deprecated since 2.2, the following have been removed in Akka 2.6.
* `actorFor`
* `actorFor`
- Use `ActorSelection` instead.
### Removed methods
* `Logging.getLogger(UntypedActor)` `UntypedActor` has been removed, use `AbstractActor` instead.
@ -338,6 +338,8 @@ This is described further in @ref:[inspecting sharding state](../cluster-shardin
### Distributed Data
#### Config for message payload size
Configuration properties for controlling sizes of `Gossip` and `DeltaPropagation` messages in Distributed Data
have been reduced. Previous defaults sometimes resulted in messages exceeding max payload size for remote
actor messages.
@ -349,6 +351,17 @@ akka.cluster.distributed-data.max-delta-elements = 500
akka.cluster.distributed-data.delta-crdt.max-delta-size = 50
```
#### DataDeleted
`DataDeleted` has been changed in its usage. While it is still a possible response to a Delete request,
it is no longer the response when an `Update` or `Get` request couldn't be performed because the entry has been deleted.
In its place are two new possible responses to a request, `UpdateDataDeleted` for an `Update` and `GetDataDeleted`
for a `Get`.
The reason for this change is that `DataDeleted` didn't extend the `UpdateResponse` and `GetResponse` types
and could therefore cause problems when `Update` and `Get` were used with `ask`. This was also a problem for
Akka Typed.
### CoordinatedShutdown is run from ActorSystem.terminate
No migration is needed but it is mentioned here because it is a change in behavior.
@ -379,7 +392,7 @@ down.
It is no longer required to both check the materialized value and the `Try[Done]` inside the @apidoc[IOResult]. In case of an IO failure
the exception will be @apidoc[IOOperationIncompleteException] instead of @apidoc[AbruptIOTerminationException].
Additionally when downstream of the IO-sources cancels with a failure, the materialized value
Additionally when downstream of the IO-sources cancels with a failure, the materialized value
is failed with that failure rather than completed successfully.
### Akka now uses Fork Join Pool from JDK
@ -424,7 +437,7 @@ The materialized value for `StreamRefs.sinkRef` and `StreamRefs.sourceRef` is no
### Naming convention changed
In needing a way to distinguish the new APIs in code and docs from the original, Akka used the naming
convention `untyped`. All references of the original have now been changed to `classic`. The
convention `untyped`. All references of the original have now been changed to `classic`. The
reference of the new APIs as `typed` is going away as it becomes the primary APIs.
### Receptionist has moved
@ -480,6 +493,9 @@ made before finalizing the APIs. Compared to Akka 2.5.x the source incompatible
* Akka Typed is now using SLF4J as the logging API. @scala[`ActorContext.log`]@java[`ActorContext.getLog`] returns
an `org.slf4j.Logger`. MDC has been changed to only support `String` values.
* `setLoggerClass` in `ActorContext` has been renamed to `setLoggerName`.
* `GetDataDeleted` and `UpdateDataDeleted` introduced as described in @ref[DataDeleted](#datadeleted).
* `SubscribeResponse` introduced in `Subscribe` because the responses can be both `Changed` and `Deleted`.
* `ReplicationDeleteFailure` renamed to `DeleteFailure`.
#### Akka Typed Stream API changes
@ -497,22 +513,22 @@ for Scala an implicit materializer is provided if there is an implicit `ActorSys
materializers and simplifies most stream use cases somewhat.
The `ActorMaterializer` factories has been deprecated and replaced with a few corresponding factories in `akka.stream.Materializer`.
New factories with per-materializer settings has not been provided but should instead be done globally through config or per stream,
see below for more details.
New factories with per-materializer settings has not been provided but should instead be done globally through config or per stream,
see below for more details.
Having a default materializer available means that most, if not all, usages of Java `ActorMaterializer.create()`
and Scala `implicit val materializer = ActorMaterializer()` should be removed.
Having a default materializer available means that most, if not all, usages of Java `ActorMaterializer.create()`
and Scala `implicit val materializer = ActorMaterializer()` should be removed.
Details about the stream materializer can be found in [Actor Materializer Lifecycle](../stream/stream-flows-and-basics.md#actor-materializer-lifecycle)
When using streams from typed the same factories and methods for creating materializers and running streams as from classic can now be used with typed. The
When using streams from typed the same factories and methods for creating materializers and running streams as from classic can now be used with typed. The
`akka.stream.typed.scaladsl.ActorMaterializer` and `akka.stream.typed.javadsl.ActorMaterializerFactory` that previously existed in the `akka-stream-typed` module has been removed.
### Materializer settings deprecated
The `ActorMaterializerSettings` class has been deprecated.
All materializer settings are available as configuration to change the system default or through attributes that can be
All materializer settings are available as configuration to change the system default or through attributes that can be
used for individual streams when they are materialized.
| Materializer setting | Corresponding attribute | Setting |
@ -546,18 +562,18 @@ Java
### Stream cancellation available upstream
Previously an Akka streams stage or operator failed it was impossible to discern this from
Previously an Akka streams stage or operator failed it was impossible to discern this from
the stage just cancelling. This has been improved so that when a stream stage fails the cause
will be propagated upstream.
The following operators have a slight change in behavior because of this:
* `FileIO.fromPath`, `FileIO.fromFile` and `StreamConverters.fromInputStream` will fail the materialized future with
* `FileIO.fromPath`, `FileIO.fromFile` and `StreamConverters.fromInputStream` will fail the materialized future with
an `IOOperationIncompleteException` when downstream fails
* `.watchTermination` will fail the materialized `Future` or `CompletionStage` rather than completing it when downstream fails
* `StreamRef` - `SourceRef` will cancel with a failure when the receiving node is downed
* `StreamRef` - `SourceRef` will cancel with a failure when the receiving node is downed
This also means that custom `GraphStage` implementations should be changed to pass on the
cancellation cause when downstream cancels by implementing the `OutHandler.onDownstreamFinish` signature
taking a `cause` parameter and calling `cancelStage(cause)` to pass the cause upstream. The old zero-argument
`onDownstreamFinish` method has been deprecated.
cancellation cause when downstream cancels by implementing the `OutHandler.onDownstreamFinish` signature
taking a `cause` parameter and calling `cancelStage(cause)` to pass the cause upstream. The old zero-argument
`onDownstreamFinish` method has been deprecated.