diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index c5da8ae796..405d517689 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -893,6 +893,22 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog val newSubscribers = new mutable.HashMap[String, mutable.Set[ActorRef]] with mutable.MultiMap[String, ActorRef] var subscriptionKeys = Map.empty[String, KeyR] + // To be able to do efficient stashing we use this field instead of sender(). + // Using internal buffer instead of Stash to avoid the overhead of the Stash mailbox. + // sender() and forward must not be used from normalReceive. + // It's set to sender() from aroundReceive, but is also used when unstashing + // messages after loading durable data. + var replyTo: ActorRef = null + + override protected[akka] def aroundReceive(rcv: Actor.Receive, msg: Any): Unit = { + replyTo = sender() + try { + super.aroundReceive(rcv, msg) + } finally { + replyTo = null + } + } + override def preStart(): Unit = { if (hasDurableKeys) durableStore ! LoadAll @@ -923,12 +939,25 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } def receive = - if (hasDurableKeys) load.orElse(normalReceive) + if (hasDurableKeys) load else normalReceive val load: Receive = { val startTime = System.nanoTime() var count = 0 + // Using internal buffer instead of Stash to avoid the overhead of the Stash mailbox. + var stash = Vector.empty[(Any, ActorRef)] + + def unstashAll(): Unit = { + val originalReplyTo = replyTo + stash.foreach { + case (msg, snd) ⇒ + replyTo = snd + normalReceive.applyOrElse(msg, unhandled) + } + stash = Vector.empty + replyTo = originalReplyTo + } { case LoadData(data) ⇒ @@ -945,23 +974,28 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } case LoadAllCompleted ⇒ log.debug( - "Loading {} entries from durable store took {} ms", - count, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)) + "Loading {} entries from durable store took {} ms, stashed {}", + count, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime), stash.size) context.become(normalReceive) + unstashAll() self ! FlushChanges case GetReplicaCount ⇒ // 0 until durable data has been loaded, used by test - sender() ! ReplicaCount(0) + replyTo ! ReplicaCount(0) case RemovedNodePruningTick | FlushChanges | GossipTick ⇒ // ignore scheduled ticks when loading durable data case m @ (_: Read | _: Write | _: Status | _: Gossip) ⇒ // ignore gossip and replication when loading durable data log.debug("ignoring message [{}] when loading durable data", m.getClass.getName) + case msg: ClusterDomainEvent ⇒ normalReceive.applyOrElse(msg, unhandled) + case msg ⇒ + stash :+= (msg → replyTo) } } + // MUST use replyTo instead of sender() and forward from normalReceive, because of the stash in load val normalReceive: Receive = { case Get(key, consistency, req) ⇒ receiveGet(key, consistency, req) case u @ Update(key, writeC, req) ⇒ receiveUpdate(key, u.modify, writeC, req) @@ -999,9 +1033,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog case Some(DataEnvelope(data, _)) ⇒ GetSuccess(key, req)(data) case None ⇒ NotFound(key, req) } - sender() ! reply + replyTo ! reply } else - context.actorOf(ReadAggregator.props(key, consistency, req, nodes, unreachable, localValue, sender()) + context.actorOf(ReadAggregator.props(key, consistency, req, nodes, unreachable, localValue, replyTo) .withDispatcher(context.props.dispatcher)) } @@ -1013,10 +1047,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } def receiveRead(key: String): Unit = { - sender() ! ReadResult(getData(key)) + replyTo ! ReadResult(getData(key)) } - def isLocalSender(): Boolean = !sender().path.address.hasGlobalScope + def isLocalSender(): Boolean = !replyTo.path.address.hasGlobalScope def receiveUpdate(key: KeyR, modify: Option[ReplicatedData] ⇒ ReplicatedData, writeConsistency: WriteConsistency, req: Option[Any]): Unit = { @@ -1037,12 +1071,12 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog if (isLocalUpdate(writeConsistency)) { if (durable) durableStore ! Store(key.id, envelope.data, - Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), sender()))) + Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), replyTo))) else - sender() ! UpdateSuccess(key, req) + replyTo ! UpdateSuccess(key, req) } else { val writeAggregator = - context.actorOf(WriteAggregator.props(key, envelope, writeConsistency, req, nodes, unreachable, sender(), durable) + context.actorOf(WriteAggregator.props(key, envelope, writeConsistency, req, nodes, unreachable, replyTo, durable) .withDispatcher(context.props.dispatcher)) if (durable) { durableStore ! Store(key.id, envelope.data, @@ -1051,10 +1085,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } case Failure(e: DataDeleted[_]) ⇒ log.debug("Received Update for deleted key [{}]", key) - sender() ! e + replyTo ! e case Failure(e) ⇒ log.debug("Received Update for key [{}], failed: {}", key, e.getMessage) - sender() ! ModifyFailure(key, "Update failed: " + e.getMessage, e, req) + replyTo ! ModifyFailure(key, "Update failed: " + e.getMessage, e, req) } } @@ -1072,9 +1106,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog write(key, envelope) match { case Some(newEnvelope) ⇒ if (isDurable(key)) - durableStore ! Store(key, newEnvelope.data, Some(StoreReply(WriteAck, WriteNack, sender()))) + durableStore ! Store(key, newEnvelope.data, Some(StoreReply(WriteAck, WriteNack, replyTo))) else - sender() ! WriteAck + replyTo ! WriteAck case None ⇒ } } @@ -1106,33 +1140,33 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog durableStore ! Store(key, newEnvelope.data, None) case None ⇒ } - sender() ! ReadRepairAck + replyTo ! ReadRepairAck } def receiveGetKeyIds(): Unit = { val keys: Set[String] = dataEntries.collect { case (key, (DataEnvelope(data, _), _)) if data != DeletedData ⇒ key }(collection.breakOut) - sender() ! GetKeyIdsResult(keys) + replyTo ! GetKeyIdsResult(keys) } def receiveDelete(key: KeyR, consistency: WriteConsistency, req: Option[Any]): Unit = { getData(key.id) match { case Some(DataEnvelope(DeletedData, _)) ⇒ // already deleted - sender() ! DataDeleted(key, req) + replyTo ! DataDeleted(key, req) case _ ⇒ setData(key.id, DeletedEnvelope) val durable = isDurable(key.id) if (isLocalUpdate(consistency)) { if (durable) durableStore ! Store(key.id, DeletedData, - Some(StoreReply(DeleteSuccess(key, req), StoreFailure(key, req), sender()))) + Some(StoreReply(DeleteSuccess(key, req), StoreFailure(key, req), replyTo))) else - sender() ! DeleteSuccess(key, req) + replyTo ! DeleteSuccess(key, req) } else { val writeAggregator = - context.actorOf(WriteAggregator.props(key, DeletedEnvelope, consistency, req, nodes, unreachable, sender(), durable) + context.actorOf(WriteAggregator.props(key, DeletedEnvelope, consistency, req, nodes, unreachable, replyTo, durable) .withDispatcher(context.props.dispatcher)) if (durable) { durableStore ! Store(key.id, DeletedData, @@ -1238,7 +1272,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog def receiveStatus(otherDigests: Map[String, Digest], chunk: Int, totChunks: Int): Unit = { if (log.isDebugEnabled) - log.debug("Received gossip status from [{}], chunk [{}] of [{}] containing [{}]", sender().path.address, + log.debug("Received gossip status from [{}], chunk [{}] of [{}] containing [{}]", replyTo.path.address, (chunk + 1), totChunks, otherDigests.keys.mkString(", ")) def isOtherDifferent(key: String, otherDigest: Digest): Boolean = { @@ -1256,22 +1290,22 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog val keys = (otherDifferentKeys ++ otherMissingKeys).take(maxDeltaElements) if (keys.nonEmpty) { if (log.isDebugEnabled) - log.debug("Sending gossip to [{}], containing [{}]", sender().path.address, keys.mkString(", ")) + log.debug("Sending gossip to [{}], containing [{}]", replyTo.path.address, keys.mkString(", ")) val g = Gossip(keys.map(k ⇒ k → getData(k).get)(collection.breakOut), sendBack = otherDifferentKeys.nonEmpty) - sender() ! g + replyTo ! g } val myMissingKeys = otherKeys diff myKeys if (myMissingKeys.nonEmpty) { if (log.isDebugEnabled) - log.debug("Sending gossip status to [{}], requesting missing [{}]", sender().path.address, myMissingKeys.mkString(", ")) + log.debug("Sending gossip status to [{}], requesting missing [{}]", replyTo.path.address, myMissingKeys.mkString(", ")) val status = Status(myMissingKeys.map(k ⇒ k → NotFoundDigest)(collection.breakOut), chunk, totChunks) - sender() ! status + replyTo ! status } } def receiveGossip(updatedData: Map[String, DataEnvelope], sendBack: Boolean): Unit = { if (log.isDebugEnabled) - log.debug("Received gossip from [{}], containing [{}]", sender().path.address, updatedData.keys.mkString(", ")) + log.debug("Received gossip from [{}], containing [{}]", replyTo.path.address, updatedData.keys.mkString(", ")) var replyData = Map.empty[String, DataEnvelope] updatedData.foreach { case (key, envelope) ⇒ @@ -1290,7 +1324,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } } if (sendBack && replyData.nonEmpty) - sender() ! Gossip(replyData, sendBack = false) + replyTo ! Gossip(replyData, sendBack = false) } def receiveSubscribe(key: KeyR, subscriber: ActorRef): Unit = { @@ -1484,7 +1518,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog def receiveGetReplicaCount(): Unit = { // selfAddress is not included in the set - sender() ! ReplicaCount(nodes.size + 1) + replyTo ! ReplicaCount(nodes.size + 1) } } diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala index b07824374a..bfb8fb16f8 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala @@ -137,11 +137,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig) var r2: ActorRef = null awaitAssert(r2 = newReplicator()) // try until name is free - // wait until all loaded - awaitAssert { - r2 ! GetKeyIds - expectMsgType[GetKeyIdsResult].keyIds should !==(Set.empty[String]) - } + // note that it will stash the commands until loading completed r2 ! Get(KeyA, ReadLocal) expectMsgType[GetSuccess[GCounter]].dataValue.value.toInt should be(3) diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala index 8a4fcf2686..2547a7cebe 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala @@ -168,16 +168,16 @@ class WriteAggregatorSpec extends AkkaSpec(s""" import ReadWriteAggregator._ - calculateMajorityWithMinCap(minCap, 3) should be (3) - calculateMajorityWithMinCap(minCap, 4) should be (4) - calculateMajorityWithMinCap(minCap, 5) should be (5) - calculateMajorityWithMinCap(minCap, 6) should be (5) - calculateMajorityWithMinCap(minCap, 7) should be (5) - calculateMajorityWithMinCap(minCap, 8) should be (5) - calculateMajorityWithMinCap(minCap, 9) should be (5) - calculateMajorityWithMinCap(minCap, 10) should be (6) - calculateMajorityWithMinCap(minCap, 11) should be (6) - calculateMajorityWithMinCap(minCap, 12) should be (7) + calculateMajorityWithMinCap(minCap, 3) should be(3) + calculateMajorityWithMinCap(minCap, 4) should be(4) + calculateMajorityWithMinCap(minCap, 5) should be(5) + calculateMajorityWithMinCap(minCap, 6) should be(5) + calculateMajorityWithMinCap(minCap, 7) should be(5) + calculateMajorityWithMinCap(minCap, 8) should be(5) + calculateMajorityWithMinCap(minCap, 9) should be(5) + calculateMajorityWithMinCap(minCap, 10) should be(6) + calculateMajorityWithMinCap(minCap, 11) should be(6) + calculateMajorityWithMinCap(minCap, 12) should be(7) } }