stash messages while Replicator is loading durable data, #22150

This commit is contained in:
Patrik Nordwall 2017-01-19 12:55:28 +01:00
parent d276a31c34
commit 572d0c8040
3 changed files with 74 additions and 44 deletions

View file

@ -893,6 +893,22 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val newSubscribers = new mutable.HashMap[String, mutable.Set[ActorRef]] with mutable.MultiMap[String, ActorRef]
var subscriptionKeys = Map.empty[String, KeyR]
// To be able to do efficient stashing we use this field instead of sender().
// Using internal buffer instead of Stash to avoid the overhead of the Stash mailbox.
// sender() and forward must not be used from normalReceive.
// It's set to sender() from aroundReceive, but is also used when unstashing
// messages after loading durable data.
var replyTo: ActorRef = null
override protected[akka] def aroundReceive(rcv: Actor.Receive, msg: Any): Unit = {
replyTo = sender()
try {
super.aroundReceive(rcv, msg)
} finally {
replyTo = null
}
}
override def preStart(): Unit = {
if (hasDurableKeys)
durableStore ! LoadAll
@ -923,12 +939,25 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
def receive =
if (hasDurableKeys) load.orElse(normalReceive)
if (hasDurableKeys) load
else normalReceive
val load: Receive = {
val startTime = System.nanoTime()
var count = 0
// Using internal buffer instead of Stash to avoid the overhead of the Stash mailbox.
var stash = Vector.empty[(Any, ActorRef)]
def unstashAll(): Unit = {
val originalReplyTo = replyTo
stash.foreach {
case (msg, snd)
replyTo = snd
normalReceive.applyOrElse(msg, unhandled)
}
stash = Vector.empty
replyTo = originalReplyTo
}
{
case LoadData(data)
@ -945,23 +974,28 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
case LoadAllCompleted
log.debug(
"Loading {} entries from durable store took {} ms",
count, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime))
"Loading {} entries from durable store took {} ms, stashed {}",
count, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime), stash.size)
context.become(normalReceive)
unstashAll()
self ! FlushChanges
case GetReplicaCount
// 0 until durable data has been loaded, used by test
sender() ! ReplicaCount(0)
replyTo ! ReplicaCount(0)
case RemovedNodePruningTick | FlushChanges | GossipTick
// ignore scheduled ticks when loading durable data
case m @ (_: Read | _: Write | _: Status | _: Gossip)
// ignore gossip and replication when loading durable data
log.debug("ignoring message [{}] when loading durable data", m.getClass.getName)
case msg: ClusterDomainEvent normalReceive.applyOrElse(msg, unhandled)
case msg
stash :+= (msg replyTo)
}
}
// MUST use replyTo instead of sender() and forward from normalReceive, because of the stash in load
val normalReceive: Receive = {
case Get(key, consistency, req) receiveGet(key, consistency, req)
case u @ Update(key, writeC, req) receiveUpdate(key, u.modify, writeC, req)
@ -999,9 +1033,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
case Some(DataEnvelope(data, _)) GetSuccess(key, req)(data)
case None NotFound(key, req)
}
sender() ! reply
replyTo ! reply
} else
context.actorOf(ReadAggregator.props(key, consistency, req, nodes, unreachable, localValue, sender())
context.actorOf(ReadAggregator.props(key, consistency, req, nodes, unreachable, localValue, replyTo)
.withDispatcher(context.props.dispatcher))
}
@ -1013,10 +1047,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
def receiveRead(key: String): Unit = {
sender() ! ReadResult(getData(key))
replyTo ! ReadResult(getData(key))
}
def isLocalSender(): Boolean = !sender().path.address.hasGlobalScope
def isLocalSender(): Boolean = !replyTo.path.address.hasGlobalScope
def receiveUpdate(key: KeyR, modify: Option[ReplicatedData] ReplicatedData,
writeConsistency: WriteConsistency, req: Option[Any]): Unit = {
@ -1037,12 +1071,12 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
if (isLocalUpdate(writeConsistency)) {
if (durable)
durableStore ! Store(key.id, envelope.data,
Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), sender())))
Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), replyTo)))
else
sender() ! UpdateSuccess(key, req)
replyTo ! UpdateSuccess(key, req)
} else {
val writeAggregator =
context.actorOf(WriteAggregator.props(key, envelope, writeConsistency, req, nodes, unreachable, sender(), durable)
context.actorOf(WriteAggregator.props(key, envelope, writeConsistency, req, nodes, unreachable, replyTo, durable)
.withDispatcher(context.props.dispatcher))
if (durable) {
durableStore ! Store(key.id, envelope.data,
@ -1051,10 +1085,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
case Failure(e: DataDeleted[_])
log.debug("Received Update for deleted key [{}]", key)
sender() ! e
replyTo ! e
case Failure(e)
log.debug("Received Update for key [{}], failed: {}", key, e.getMessage)
sender() ! ModifyFailure(key, "Update failed: " + e.getMessage, e, req)
replyTo ! ModifyFailure(key, "Update failed: " + e.getMessage, e, req)
}
}
@ -1072,9 +1106,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
write(key, envelope) match {
case Some(newEnvelope)
if (isDurable(key))
durableStore ! Store(key, newEnvelope.data, Some(StoreReply(WriteAck, WriteNack, sender())))
durableStore ! Store(key, newEnvelope.data, Some(StoreReply(WriteAck, WriteNack, replyTo)))
else
sender() ! WriteAck
replyTo ! WriteAck
case None
}
}
@ -1106,33 +1140,33 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
durableStore ! Store(key, newEnvelope.data, None)
case None
}
sender() ! ReadRepairAck
replyTo ! ReadRepairAck
}
def receiveGetKeyIds(): Unit = {
val keys: Set[String] = dataEntries.collect {
case (key, (DataEnvelope(data, _), _)) if data != DeletedData key
}(collection.breakOut)
sender() ! GetKeyIdsResult(keys)
replyTo ! GetKeyIdsResult(keys)
}
def receiveDelete(key: KeyR, consistency: WriteConsistency, req: Option[Any]): Unit = {
getData(key.id) match {
case Some(DataEnvelope(DeletedData, _))
// already deleted
sender() ! DataDeleted(key, req)
replyTo ! DataDeleted(key, req)
case _
setData(key.id, DeletedEnvelope)
val durable = isDurable(key.id)
if (isLocalUpdate(consistency)) {
if (durable)
durableStore ! Store(key.id, DeletedData,
Some(StoreReply(DeleteSuccess(key, req), StoreFailure(key, req), sender())))
Some(StoreReply(DeleteSuccess(key, req), StoreFailure(key, req), replyTo)))
else
sender() ! DeleteSuccess(key, req)
replyTo ! DeleteSuccess(key, req)
} else {
val writeAggregator =
context.actorOf(WriteAggregator.props(key, DeletedEnvelope, consistency, req, nodes, unreachable, sender(), durable)
context.actorOf(WriteAggregator.props(key, DeletedEnvelope, consistency, req, nodes, unreachable, replyTo, durable)
.withDispatcher(context.props.dispatcher))
if (durable) {
durableStore ! Store(key.id, DeletedData,
@ -1238,7 +1272,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def receiveStatus(otherDigests: Map[String, Digest], chunk: Int, totChunks: Int): Unit = {
if (log.isDebugEnabled)
log.debug("Received gossip status from [{}], chunk [{}] of [{}] containing [{}]", sender().path.address,
log.debug("Received gossip status from [{}], chunk [{}] of [{}] containing [{}]", replyTo.path.address,
(chunk + 1), totChunks, otherDigests.keys.mkString(", "))
def isOtherDifferent(key: String, otherDigest: Digest): Boolean = {
@ -1256,22 +1290,22 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val keys = (otherDifferentKeys ++ otherMissingKeys).take(maxDeltaElements)
if (keys.nonEmpty) {
if (log.isDebugEnabled)
log.debug("Sending gossip to [{}], containing [{}]", sender().path.address, keys.mkString(", "))
log.debug("Sending gossip to [{}], containing [{}]", replyTo.path.address, keys.mkString(", "))
val g = Gossip(keys.map(k k getData(k).get)(collection.breakOut), sendBack = otherDifferentKeys.nonEmpty)
sender() ! g
replyTo ! g
}
val myMissingKeys = otherKeys diff myKeys
if (myMissingKeys.nonEmpty) {
if (log.isDebugEnabled)
log.debug("Sending gossip status to [{}], requesting missing [{}]", sender().path.address, myMissingKeys.mkString(", "))
log.debug("Sending gossip status to [{}], requesting missing [{}]", replyTo.path.address, myMissingKeys.mkString(", "))
val status = Status(myMissingKeys.map(k k NotFoundDigest)(collection.breakOut), chunk, totChunks)
sender() ! status
replyTo ! status
}
}
def receiveGossip(updatedData: Map[String, DataEnvelope], sendBack: Boolean): Unit = {
if (log.isDebugEnabled)
log.debug("Received gossip from [{}], containing [{}]", sender().path.address, updatedData.keys.mkString(", "))
log.debug("Received gossip from [{}], containing [{}]", replyTo.path.address, updatedData.keys.mkString(", "))
var replyData = Map.empty[String, DataEnvelope]
updatedData.foreach {
case (key, envelope)
@ -1290,7 +1324,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
}
if (sendBack && replyData.nonEmpty)
sender() ! Gossip(replyData, sendBack = false)
replyTo ! Gossip(replyData, sendBack = false)
}
def receiveSubscribe(key: KeyR, subscriber: ActorRef): Unit = {
@ -1484,7 +1518,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def receiveGetReplicaCount(): Unit = {
// selfAddress is not included in the set
sender() ! ReplicaCount(nodes.size + 1)
replyTo ! ReplicaCount(nodes.size + 1)
}
}

View file

@ -137,11 +137,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig)
var r2: ActorRef = null
awaitAssert(r2 = newReplicator()) // try until name is free
// wait until all loaded
awaitAssert {
r2 ! GetKeyIds
expectMsgType[GetKeyIdsResult].keyIds should !==(Set.empty[String])
}
// note that it will stash the commands until loading completed
r2 ! Get(KeyA, ReadLocal)
expectMsgType[GetSuccess[GCounter]].dataValue.value.toInt should be(3)

View file

@ -168,16 +168,16 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
import ReadWriteAggregator._
calculateMajorityWithMinCap(minCap, 3) should be (3)
calculateMajorityWithMinCap(minCap, 4) should be (4)
calculateMajorityWithMinCap(minCap, 5) should be (5)
calculateMajorityWithMinCap(minCap, 6) should be (5)
calculateMajorityWithMinCap(minCap, 7) should be (5)
calculateMajorityWithMinCap(minCap, 8) should be (5)
calculateMajorityWithMinCap(minCap, 9) should be (5)
calculateMajorityWithMinCap(minCap, 10) should be (6)
calculateMajorityWithMinCap(minCap, 11) should be (6)
calculateMajorityWithMinCap(minCap, 12) should be (7)
calculateMajorityWithMinCap(minCap, 3) should be(3)
calculateMajorityWithMinCap(minCap, 4) should be(4)
calculateMajorityWithMinCap(minCap, 5) should be(5)
calculateMajorityWithMinCap(minCap, 6) should be(5)
calculateMajorityWithMinCap(minCap, 7) should be(5)
calculateMajorityWithMinCap(minCap, 8) should be(5)
calculateMajorityWithMinCap(minCap, 9) should be(5)
calculateMajorityWithMinCap(minCap, 10) should be(6)
calculateMajorityWithMinCap(minCap, 11) should be(6)
calculateMajorityWithMinCap(minCap, 12) should be(7)
}
}