diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index 8172f53cdb..e08f0f7bac 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -4,9 +4,7 @@ package akka.cluster.ddata import java.security.MessageDigest -import scala.annotation.tailrec import scala.collection.immutable -import scala.collection.immutable.Queue import scala.collection.mutable import scala.concurrent.duration._ import scala.concurrent.duration.FiniteDuration @@ -969,7 +967,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } sender() ! reply } else - context.actorOf(ReadAggregator.props(key, consistency, req, nodes, localValue, sender()) + context.actorOf(ReadAggregator.props(key, consistency, req, nodes, unreachable, localValue, sender()) .withDispatcher(context.props.dispatcher)) } @@ -1010,7 +1008,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog sender() ! UpdateSuccess(key, req) } else { val writeAggregator = - context.actorOf(WriteAggregator.props(key, envelope, writeConsistency, req, nodes, sender(), durable) + context.actorOf(WriteAggregator.props(key, envelope, writeConsistency, req, nodes, unreachable, sender(), durable) .withDispatcher(context.props.dispatcher)) if (durable) { durableStore ! Store(key.id, envelope.data, @@ -1100,7 +1098,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog sender() ! DeleteSuccess(key) } else { val writeAggregator = - context.actorOf(WriteAggregator.props(key, DeletedEnvelope, consistency, None, nodes, sender(), durable) + context.actorOf(WriteAggregator.props(key, DeletedEnvelope, consistency, None, nodes, unreachable, sender(), durable) .withDispatcher(context.props.dispatcher)) if (durable) { durableStore ! Store(key.id, DeletedData, @@ -1473,6 +1471,8 @@ private[akka] abstract class ReadWriteAggregator extends Actor { def timeout: FiniteDuration def nodes: Set[Address] + def unreachable: Set[Address] + def reachableNodes: Set[Address] = nodes diff unreachable import context.dispatcher var sendToSecondarySchedule = context.system.scheduler.scheduleOnce(timeout / 5, self, SendToSecondary) @@ -1487,7 +1487,9 @@ private[akka] abstract class ReadWriteAggregator extends Actor { if (primarySize >= nodes.size) (nodes, Set.empty[Address]) else { - val (p, s) = scala.util.Random.shuffle(nodes.toVector).splitAt(primarySize) + // Prefer to use reachable nodes over the unreachable nodes first + val orderedNodes = scala.util.Random.shuffle(reachableNodes.toVector) ++ scala.util.Random.shuffle(unreachable.toVector) + val (p, s) = orderedNodes.splitAt(primarySize) (p, s.take(MaxSecondaryNodes)) } } @@ -1512,9 +1514,10 @@ private[akka] object WriteAggregator { consistency: Replicator.WriteConsistency, req: Option[Any], nodes: Set[Address], + unreachable: Set[Address], replyTo: ActorRef, durable: Boolean): Props = - Props(new WriteAggregator(key, envelope, consistency, req, nodes, replyTo, durable)) + Props(new WriteAggregator(key, envelope, consistency, req, nodes, unreachable, replyTo, durable)) .withDeploy(Deploy.local) } @@ -1522,13 +1525,14 @@ private[akka] object WriteAggregator { * INTERNAL API */ private[akka] class WriteAggregator( - key: KeyR, - envelope: Replicator.Internal.DataEnvelope, - consistency: Replicator.WriteConsistency, - req: Option[Any], - override val nodes: Set[Address], - replyTo: ActorRef, - durable: Boolean) extends ReadWriteAggregator { + key: KeyR, + envelope: Replicator.Internal.DataEnvelope, + consistency: Replicator.WriteConsistency, + req: Option[Any], + override val nodes: Set[Address], + override val unreachable: Set[Address], + replyTo: ActorRef, + durable: Boolean) extends ReadWriteAggregator { import Replicator._ import Replicator.Internal._ @@ -1616,9 +1620,10 @@ private[akka] object ReadAggregator { consistency: Replicator.ReadConsistency, req: Option[Any], nodes: Set[Address], + unreachable: Set[Address], localValue: Option[Replicator.Internal.DataEnvelope], replyTo: ActorRef): Props = - Props(new ReadAggregator(key, consistency, req, nodes, localValue, replyTo)) + Props(new ReadAggregator(key, consistency, req, nodes, unreachable, localValue, replyTo)) .withDeploy(Deploy.local) } @@ -1627,12 +1632,13 @@ private[akka] object ReadAggregator { * INTERNAL API */ private[akka] class ReadAggregator( - key: KeyR, - consistency: Replicator.ReadConsistency, - req: Option[Any], - override val nodes: Set[Address], - localValue: Option[Replicator.Internal.DataEnvelope], - replyTo: ActorRef) extends ReadWriteAggregator { + key: KeyR, + consistency: Replicator.ReadConsistency, + req: Option[Any], + override val nodes: Set[Address], + override val unreachable: Set[Address], + localValue: Option[Replicator.Internal.DataEnvelope], + replyTo: ActorRef) extends ReadWriteAggregator { import Replicator._ import Replicator.Internal._ diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala index 36256b7455..0fda3f28a0 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala @@ -6,26 +6,29 @@ package akka.cluster.ddata import scala.concurrent.duration._ import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.ActorSelection +import akka.actor.ActorSystem +import akka.actor.Address import akka.actor.Props import akka.testkit._ -import akka.actor.Address -import akka.actor.ActorRef import akka.cluster.ddata.Replicator.Internal._ import akka.cluster.ddata.Replicator._ -import akka.actor.ActorSelection import akka.remote.RARP +import scala.concurrent.Future + object WriteAggregatorSpec { val key = GSetKey[String]("a") def writeAggregatorProps(data: GSet[String], consistency: Replicator.WriteConsistency, - probes: Map[Address, ActorRef], nodes: Set[Address], replyTo: ActorRef, durable: Boolean): Props = - Props(new TestWriteAggregator(data, consistency, probes, nodes, replyTo, durable)) + probes: Map[Address, ActorRef], nodes: Set[Address], unreachable: Set[Address], replyTo: ActorRef, durable: Boolean): Props = + Props(new TestWriteAggregator(data, consistency, probes, nodes, unreachable, replyTo, durable)) class TestWriteAggregator(data: GSet[String], consistency: Replicator.WriteConsistency, - probes: Map[Address, ActorRef], nodes: Set[Address], replyTo: ActorRef, durable: Boolean) - extends WriteAggregator(key, DataEnvelope(data), consistency, None, nodes, replyTo, durable) { + probes: Map[Address, ActorRef], nodes: Set[Address], unreachable: Set[Address], replyTo: ActorRef, durable: Boolean) + extends WriteAggregator(key, DataEnvelope(data), consistency, None, nodes, unreachable, replyTo, durable) { override def replica(address: Address): ActorSelection = context.actorSelection(probes(address).path) @@ -50,6 +53,13 @@ object WriteAggregatorSpec { replica ! msg } } + + object TestMock { + def apply()(implicit system: ActorSystem) = new TestMock(system) + } + class TestMock(_application: ActorSystem) extends TestProbe(_application) { + val writeAckAdapter = system.actorOf(WriteAggregatorSpec.writeAckAdapterProps(this.ref)) + } } class WriteAggregatorSpec extends AkkaSpec(s""" @@ -62,6 +72,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" } """) with ImplicitSender { + import WriteAggregatorSpec._ val protocol = if (RARP(system).provider.remoteSettings.Artery.Enabled) "akka" @@ -82,11 +93,19 @@ class WriteAggregatorSpec extends AkkaSpec(s""" def probes(probe: ActorRef): Map[Address, ActorRef] = nodes.toSeq.map(_ → system.actorOf(WriteAggregatorSpec.writeAckAdapterProps(probe))).toMap + /** + * Create a tuple for each node with the WriteAckAdapter and the TestProbe + */ + def probes(): Map[Address, TestMock] = { + val probe = TestProbe() + nodes.toSeq.map(_ → TestMock()).toMap + } + "WriteAggregator" must { "send to at least N/2+1 replicas when WriteMajority" in { val probe = TestProbe() val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps( - data, writeMajority, probes(probe.ref), nodes, testActor, durable = false)) + data, writeMajority, probes(probe.ref), nodes, Set.empty, testActor, durable = false)) probe.expectMsgType[Write] probe.lastSender ! WriteAck @@ -98,19 +117,29 @@ class WriteAggregatorSpec extends AkkaSpec(s""" } "send to more when no immediate reply" in { - val probe = TestProbe() + val testProbes = probes() + val testProbeRefs = testProbes.map { case (a, tm) ⇒ a → tm.writeAckAdapter } val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps( - data, writeMajority, probes(probe.ref), nodes, testActor, durable = false)) + data, writeMajority, testProbeRefs, nodes, Set(nodeC, nodeD), testActor, durable = false)) - probe.expectMsgType[Write] + testProbes(nodeA).expectMsgType[Write] // no reply - probe.expectMsgType[Write] - // no reply - probe.lastSender ! WriteAck - probe.expectMsgType[Write] - probe.lastSender ! WriteAck - probe.expectMsgType[Write] - probe.lastSender ! WriteAck + testProbes(nodeB).expectMsgType[Write] + testProbes(nodeB).lastSender ! WriteAck + // Make sure that unreachable nodes do not get a message until 1/5 of the time the reachable nodes did not answer + val t = timeout / 5 - 50.milliseconds.dilated + import system.dispatcher + Future.sequence { + Seq( + Future { testProbes(nodeC).expectNoMsg(t) }, + Future { testProbes(nodeD).expectNoMsg(t) } + ) + }.futureValue + testProbes(nodeC).expectMsgType[Write] + testProbes(nodeC).lastSender ! WriteAck + testProbes(nodeD).expectMsgType[Write] + testProbes(nodeD).lastSender ! WriteAck + expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None)) watch(aggr) expectTerminated(aggr) @@ -119,7 +148,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" "timeout when less than required acks" in { val probe = TestProbe() val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps( - data, writeMajority, probes(probe.ref), nodes, testActor, durable = false)) + data, writeMajority, probes(probe.ref), nodes, Set.empty, testActor, durable = false)) probe.expectMsgType[Write] // no reply @@ -139,7 +168,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" "not reply before local confirmation" in { val probe = TestProbe() val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps( - data, writeThree, probes(probe.ref), nodes, testActor, durable = true)) + data, writeThree, probes(probe.ref), nodes, Set.empty, testActor, durable = true)) probe.expectMsgType[Write] probe.lastSender ! WriteAck @@ -158,7 +187,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" "tolerate WriteNack if enough WriteAck" in { val probe = TestProbe() val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps( - data, writeThree, probes(probe.ref), nodes, testActor, durable = true)) + data, writeThree, probes(probe.ref), nodes, Set.empty, testActor, durable = true)) aggr ! UpdateSuccess(WriteAggregatorSpec.key, None) // the local write probe.expectMsgType[Write] @@ -176,7 +205,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" "reply with StoreFailure when too many nacks" in { val probe = TestProbe() val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps( - data, writeMajority, probes(probe.ref), nodes, testActor, durable = true)) + data, writeMajority, probes(probe.ref), nodes, Set.empty, testActor, durable = true)) probe.expectMsgType[Write] probe.lastSender ! WriteNack @@ -196,7 +225,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" "timeout when less than required acks" in { val probe = TestProbe() val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps( - data, writeMajority, probes(probe.ref), nodes, testActor, durable = true)) + data, writeMajority, probes(probe.ref), nodes, Set.empty, testActor, durable = true)) probe.expectMsgType[Write] // no reply diff --git a/akka-docs/rst/java/distributed-data.rst b/akka-docs/rst/java/distributed-data.rst index 5fe21ea95a..d64543a951 100644 --- a/akka-docs/rst/java/distributed-data.rst +++ b/akka-docs/rst/java/distributed-data.rst @@ -71,7 +71,7 @@ function that only uses the data parameter and stable fields from enclosing scop for example not access ``sender()`` reference of an enclosing actor. ``Update`` is intended to only be sent from an actor running in same local ``ActorSystem`` as - * the `Replicator`, because the `modify` function is typically not serializable. + the ``Replicator``, because the ``modify`` function is typically not serializable. You supply a write consistency level which has the following meaning: @@ -84,7 +84,11 @@ You supply a write consistency level which has the following meaning: (or cluster role group) * ``writeAll`` the value will immediately be written to all nodes in the cluster (or all nodes in the cluster role group) - + +When you specify to write to ``n`` out of ``x`` nodes, the update will first replicate to ``n`` nodes. If there are not + enough Acks after 1/5th of the timeout, the update will be replicated to ``n`` other nodes. If there are less than n nodes + left all of the remaining nodes are used. Reachable nodes are prefered over unreachable nodes. + .. includecode:: code/docs/ddata/DistributedDataDocTest.java#update As reply of the ``Update`` a ``Replicator.UpdateSuccess`` is sent to the sender of the diff --git a/akka-docs/rst/scala/distributed-data.rst b/akka-docs/rst/scala/distributed-data.rst index 22dd241187..4e25bfd84f 100644 --- a/akka-docs/rst/scala/distributed-data.rst +++ b/akka-docs/rst/scala/distributed-data.rst @@ -71,7 +71,7 @@ function that only uses the data parameter and stable fields from enclosing scop for example not access ``sender()`` reference of an enclosing actor. ``Update`` is intended to only be sent from an actor running in same local ``ActorSystem`` as - * the `Replicator`, because the `modify` function is typically not serializable. + the ``Replicator``, because the ``modify`` function is typically not serializable. You supply a write consistency level which has the following meaning: @@ -84,7 +84,11 @@ You supply a write consistency level which has the following meaning: (or cluster role group) * ``WriteAll`` the value will immediately be written to all nodes in the cluster (or all nodes in the cluster role group) - + +When you specify to write to ``n`` out of ``x`` nodes, the update will first replicate to ``n`` nodes. If there are not + enough Acks after 1/5th of the timeout, the update will be replicated to ``n`` other nodes. If there are less than n nodes + left all of the remaining nodes are used. Reachable nodes are prefered over unreachable nodes. + .. includecode:: code/docs/ddata/DistributedDataDocSpec.scala#update As reply of the ``Update`` a ``Replicator.UpdateSuccess`` is sent to the sender of the diff --git a/project/MiMa.scala b/project/MiMa.scala index aa66bf75c7..0aecd1afd5 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -149,7 +149,14 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[MissingClassProblem]("akka.remote.security.provider.AES128CounterInetRNG"), ProblemFilters.exclude[MissingClassProblem]("akka.remote.security.provider.AES256CounterInetRNG"), ProblemFilters.exclude[MissingClassProblem]("akka.remote.security.provider.InternetSeedGenerator"), - ProblemFilters.exclude[MissingClassProblem]("akka.remote.security.provider.InternetSeedGenerator$") + ProblemFilters.exclude[MissingClassProblem]("akka.remote.security.provider.InternetSeedGenerator$"), + + // #21648 Prefer reachable nodes in consistency writes/reads + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.ReadWriteAggregator.unreachable"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.WriteAggregator.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.WriteAggregator.props"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.ReadAggregator.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.ReadAggregator.props") ) Map(