=rem #3527 Throw away system message delivery state when new UID from a remote Address is detected

This commit is contained in:
Endre Sándor Varga 2013-08-19 15:34:24 +02:00 committed by Patrik Nordwall
parent 5a1d9136f0
commit cc15919512
2 changed files with 53 additions and 28 deletions

View file

@ -10,8 +10,7 @@ import akka.actor._
import akka.dispatch.sysmsg.SystemMessage
import akka.event.LoggingAdapter
import akka.pattern.pipe
import akka.remote.EndpointManager.Link
import akka.remote.EndpointManager.Send
import akka.remote.EndpointManager.{ ResendState, Link, Send }
import akka.remote.EndpointWriter.{ StoppedReading, FlushAndStop }
import akka.remote.WireFormats.SerializedMessage
import akka.remote.transport.AkkaPduCodec.Message
@ -157,7 +156,7 @@ private[remote] object ReliableDeliverySupervisor {
transport: Transport,
settings: RemoteSettings,
codec: AkkaPduCodec,
receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]): Props =
receiveBuffers: ConcurrentHashMap[Link, ResendState]): Props =
Props(classOf[ReliableDeliverySupervisor], handleOrActive, localAddress, remoteAddress, transport, settings,
codec, receiveBuffers)
}
@ -172,7 +171,7 @@ private[remote] class ReliableDeliverySupervisor(
val transport: Transport,
val settings: RemoteSettings,
val codec: AkkaPduCodec,
val receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]) extends Actor {
val receiveBuffers: ConcurrentHashMap[Link, ResendState]) extends Actor {
import ReliableDeliverySupervisor._
import context.dispatcher
@ -194,18 +193,26 @@ private[remote] class ReliableDeliverySupervisor(
}
var currentHandle: Option[AkkaProtocolHandle] = handleOrActive
var resendBuffer = new AckedSendBuffer[Send](settings.SysMsgBufferSize)
var resendDeadline = Deadline.now + settings.SysResendTimeout
var lastCumulativeAck = SeqNo(-1)
val nextSeq = {
var seqCounter: Long = 0L
() {
var resendBuffer: AckedSendBuffer[Send] = _
var resendDeadline: Deadline = _
var lastCumulativeAck: SeqNo = _
var seqCounter: Long = _
def reset() {
resendBuffer = new AckedSendBuffer[Send](settings.SysMsgBufferSize)
resendDeadline = Deadline.now + settings.SysResendTimeout
lastCumulativeAck = SeqNo(-1)
seqCounter = 0L
}
reset()
def nextSeq(): SeqNo = {
val tmp = seqCounter
seqCounter += 1
SeqNo(tmp)
}
}
var writer: ActorRef = createWriter()
var uid: Option[Int] = handleOrActive map { _.handshakeInfo.uid }
@ -235,7 +242,12 @@ private[remote] class ReliableDeliverySupervisor(
case s: Send
handleSend(s)
case ack: Ack
resendBuffer = resendBuffer.acknowledge(ack)
try resendBuffer = resendBuffer.acknowledge(ack)
catch {
case NonFatal(e)
throw new InvalidAssociationException("Error encountered while processing system message acknowledgement", e)
}
if (lastCumulativeAck < ack.cumulativeAck) {
resendDeadline = Deadline.now + settings.SysResendTimeout
lastCumulativeAck = ack.cumulativeAck
@ -250,7 +262,10 @@ private[remote] class ReliableDeliverySupervisor(
if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty)
context.system.scheduler.scheduleOnce(settings.SysResendTimeout, self, AttemptSysMsgRedelivery)
context.become(idle)
case GotUid(u) uid = Some(u)
case GotUid(u)
// New system that has the same address as the old - need to start from fresh state
if (uid.isDefined && uid.get != u) reset()
uid = Some(u)
case s: EndpointWriter.StopReading writer forward s
}
@ -358,7 +373,7 @@ private[remote] object EndpointWriter {
transport: Transport,
settings: RemoteSettings,
codec: AkkaPduCodec,
receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]],
receiveBuffers: ConcurrentHashMap[Link, ResendState],
reliableDeliverySupervisor: Option[ActorRef]): Props =
Props(classOf[EndpointWriter], handleOrActive, localAddress, remoteAddress, transport, settings, codec,
receiveBuffers, reliableDeliverySupervisor)
@ -398,7 +413,7 @@ private[remote] class EndpointWriter(
transport: Transport,
settings: RemoteSettings,
codec: AkkaPduCodec,
val receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]],
val receiveBuffers: ConcurrentHashMap[Link, ResendState],
val reliableDeliverySupervisor: Option[ActorRef])
extends EndpointActor(localAddress, remoteAddress, transport, settings, codec) with UnboundedStash
with FSM[EndpointWriter.State, Unit] {
@ -603,7 +618,7 @@ private[remote] class EndpointWriter(
val newReader =
context.watch(context.actorOf(
RARP(context.system).configureDispatcher(EndpointReader.props(localAddress, remoteAddress, transport, settings, codec,
msgDispatch, inbound, reliableDeliverySupervisor, receiveBuffers)).withDeploy(Deploy.local),
msgDispatch, inbound, handle.handshakeInfo.uid, reliableDeliverySupervisor, receiveBuffers)).withDeploy(Deploy.local),
"endpointReader-" + AddressUrlEncoder(remoteAddress) + "-" + readerId.next()))
handle.readHandlerPromise.success(ActorHandleEventListener(newReader))
Some(newReader)
@ -633,10 +648,11 @@ private[remote] object EndpointReader {
codec: AkkaPduCodec,
msgDispatch: InboundMessageDispatcher,
inbound: Boolean,
uid: Int,
reliableDeliverySupervisor: Option[ActorRef],
receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]): Props =
receiveBuffers: ConcurrentHashMap[Link, ResendState]): Props =
Props(classOf[EndpointReader], localAddress, remoteAddress, transport, settings, codec, msgDispatch, inbound,
reliableDeliverySupervisor, receiveBuffers)
uid, reliableDeliverySupervisor, receiveBuffers)
}
@ -651,8 +667,9 @@ private[remote] class EndpointReader(
codec: AkkaPduCodec,
msgDispatch: InboundMessageDispatcher,
val inbound: Boolean,
val uid: Int,
val reliableDeliverySupervisor: Option[ActorRef],
val receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]) extends EndpointActor(localAddress, remoteAddress, transport, settings, codec) {
val receiveBuffers: ConcurrentHashMap[Link, ResendState]) extends EndpointActor(localAddress, remoteAddress, transport, settings, codec) {
import EndpointWriter.{ OutboundAck, StopReading, StoppedReading }
@ -662,20 +679,26 @@ private[remote] class EndpointReader(
override def preStart(): Unit = {
receiveBuffers.get(Link(localAddress, remoteAddress)) match {
case null
case buf
ackedReceiveBuffer = buf
case ResendState(`uid`, buffer)
ackedReceiveBuffer = buffer
deliverAndAck()
case _
}
}
override def postStop(): Unit = saveState()
def saveState(): Unit = {
def merge(currentState: ResendState, oldState: ResendState): ResendState =
if (currentState.uid == oldState.uid) ResendState(uid, oldState.buffer.mergeFrom(currentState.buffer))
else currentState
@tailrec
def updateSavedState(key: Link, expectedState: AckedReceiveBuffer[Message]): Unit = {
def updateSavedState(key: Link, expectedState: ResendState): Unit = {
if (expectedState eq null) {
if (receiveBuffers.putIfAbsent(key, ackedReceiveBuffer) ne null) updateSavedState(key, receiveBuffers.get(key))
} else if (!receiveBuffers.replace(key, expectedState, expectedState.mergeFrom(ackedReceiveBuffer)))
if (receiveBuffers.putIfAbsent(key, ResendState(uid, ackedReceiveBuffer)) ne null)
updateSavedState(key, receiveBuffers.get(key))
} else if (!receiveBuffers.replace(key, expectedState, merge(ResendState(uid, ackedReceiveBuffer), expectedState)))
updateSavedState(key, receiveBuffers.get(key))
}

View file

@ -258,6 +258,8 @@ private[remote] object EndpointManager {
// Helper class to store address pairs
case class Link(localAddress: Address, remoteAddress: Address)
case class ResendState(uid: Int, buffer: AckedReceiveBuffer[Message])
sealed trait EndpointPolicy {
/**
@ -429,7 +431,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
}
// Structure for saving reliable delivery state across restarts of Endpoints
val receiveBuffers = new ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]()
val receiveBuffers = new ConcurrentHashMap[Link, ResendState]()
def receive = {
case Listen(addressesPromise)