From cc159195128c5bd896b9928bf461c858551313b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Mon, 19 Aug 2013 15:34:24 +0200 Subject: [PATCH] =rem #3527 Throw away system message delivery state when new UID from a remote Address is detected --- .../src/main/scala/akka/remote/Endpoint.scala | 77 ++++++++++++------- .../src/main/scala/akka/remote/Remoting.scala | 4 +- 2 files changed, 53 insertions(+), 28 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 209079aae0..17d58f5927 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -10,8 +10,7 @@ import akka.actor._ import akka.dispatch.sysmsg.SystemMessage import akka.event.LoggingAdapter import akka.pattern.pipe -import akka.remote.EndpointManager.Link -import akka.remote.EndpointManager.Send +import akka.remote.EndpointManager.{ ResendState, Link, Send } import akka.remote.EndpointWriter.{ StoppedReading, FlushAndStop } import akka.remote.WireFormats.SerializedMessage import akka.remote.transport.AkkaPduCodec.Message @@ -157,7 +156,7 @@ private[remote] object ReliableDeliverySupervisor { transport: Transport, settings: RemoteSettings, codec: AkkaPduCodec, - receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]): Props = + receiveBuffers: ConcurrentHashMap[Link, ResendState]): Props = Props(classOf[ReliableDeliverySupervisor], handleOrActive, localAddress, remoteAddress, transport, settings, codec, receiveBuffers) } @@ -172,7 +171,7 @@ private[remote] class ReliableDeliverySupervisor( val transport: Transport, val settings: RemoteSettings, val codec: AkkaPduCodec, - val receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]) extends Actor { + val receiveBuffers: ConcurrentHashMap[Link, ResendState]) extends Actor { import ReliableDeliverySupervisor._ import context.dispatcher @@ -194,17 +193,25 @@ private[remote] class ReliableDeliverySupervisor( } var currentHandle: Option[AkkaProtocolHandle] = handleOrActive - var resendBuffer = new AckedSendBuffer[Send](settings.SysMsgBufferSize) - var resendDeadline = Deadline.now + settings.SysResendTimeout - var lastCumulativeAck = SeqNo(-1) - val nextSeq = { - var seqCounter: Long = 0L - () ⇒ { - val tmp = seqCounter - seqCounter += 1 - SeqNo(tmp) - } + var resendBuffer: AckedSendBuffer[Send] = _ + var resendDeadline: Deadline = _ + var lastCumulativeAck: SeqNo = _ + var seqCounter: Long = _ + + def reset() { + resendBuffer = new AckedSendBuffer[Send](settings.SysMsgBufferSize) + resendDeadline = Deadline.now + settings.SysResendTimeout + lastCumulativeAck = SeqNo(-1) + seqCounter = 0L + } + + reset() + + def nextSeq(): SeqNo = { + val tmp = seqCounter + seqCounter += 1 + SeqNo(tmp) } var writer: ActorRef = createWriter() @@ -235,7 +242,12 @@ private[remote] class ReliableDeliverySupervisor( case s: Send ⇒ handleSend(s) case ack: Ack ⇒ - resendBuffer = resendBuffer.acknowledge(ack) + try resendBuffer = resendBuffer.acknowledge(ack) + catch { + case NonFatal(e) ⇒ + throw new InvalidAssociationException("Error encountered while processing system message acknowledgement", e) + } + if (lastCumulativeAck < ack.cumulativeAck) { resendDeadline = Deadline.now + settings.SysResendTimeout lastCumulativeAck = ack.cumulativeAck @@ -250,7 +262,10 @@ private[remote] class ReliableDeliverySupervisor( if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) context.system.scheduler.scheduleOnce(settings.SysResendTimeout, self, AttemptSysMsgRedelivery) context.become(idle) - case GotUid(u) ⇒ uid = Some(u) + case GotUid(u) ⇒ + // New system that has the same address as the old - need to start from fresh state + if (uid.isDefined && uid.get != u) reset() + uid = Some(u) case s: EndpointWriter.StopReading ⇒ writer forward s } @@ -358,7 +373,7 @@ private[remote] object EndpointWriter { transport: Transport, settings: RemoteSettings, codec: AkkaPduCodec, - receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]], + receiveBuffers: ConcurrentHashMap[Link, ResendState], reliableDeliverySupervisor: Option[ActorRef]): Props = Props(classOf[EndpointWriter], handleOrActive, localAddress, remoteAddress, transport, settings, codec, receiveBuffers, reliableDeliverySupervisor) @@ -398,7 +413,7 @@ private[remote] class EndpointWriter( transport: Transport, settings: RemoteSettings, codec: AkkaPduCodec, - val receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]], + val receiveBuffers: ConcurrentHashMap[Link, ResendState], val reliableDeliverySupervisor: Option[ActorRef]) extends EndpointActor(localAddress, remoteAddress, transport, settings, codec) with UnboundedStash with FSM[EndpointWriter.State, Unit] { @@ -603,7 +618,7 @@ private[remote] class EndpointWriter( val newReader = context.watch(context.actorOf( RARP(context.system).configureDispatcher(EndpointReader.props(localAddress, remoteAddress, transport, settings, codec, - msgDispatch, inbound, reliableDeliverySupervisor, receiveBuffers)).withDeploy(Deploy.local), + msgDispatch, inbound, handle.handshakeInfo.uid, reliableDeliverySupervisor, receiveBuffers)).withDeploy(Deploy.local), "endpointReader-" + AddressUrlEncoder(remoteAddress) + "-" + readerId.next())) handle.readHandlerPromise.success(ActorHandleEventListener(newReader)) Some(newReader) @@ -633,10 +648,11 @@ private[remote] object EndpointReader { codec: AkkaPduCodec, msgDispatch: InboundMessageDispatcher, inbound: Boolean, + uid: Int, reliableDeliverySupervisor: Option[ActorRef], - receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]): Props = + receiveBuffers: ConcurrentHashMap[Link, ResendState]): Props = Props(classOf[EndpointReader], localAddress, remoteAddress, transport, settings, codec, msgDispatch, inbound, - reliableDeliverySupervisor, receiveBuffers) + uid, reliableDeliverySupervisor, receiveBuffers) } @@ -651,8 +667,9 @@ private[remote] class EndpointReader( codec: AkkaPduCodec, msgDispatch: InboundMessageDispatcher, val inbound: Boolean, + val uid: Int, val reliableDeliverySupervisor: Option[ActorRef], - val receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]) extends EndpointActor(localAddress, remoteAddress, transport, settings, codec) { + val receiveBuffers: ConcurrentHashMap[Link, ResendState]) extends EndpointActor(localAddress, remoteAddress, transport, settings, codec) { import EndpointWriter.{ OutboundAck, StopReading, StoppedReading } @@ -662,20 +679,26 @@ private[remote] class EndpointReader( override def preStart(): Unit = { receiveBuffers.get(Link(localAddress, remoteAddress)) match { case null ⇒ - case buf ⇒ - ackedReceiveBuffer = buf + case ResendState(`uid`, buffer) ⇒ + ackedReceiveBuffer = buffer deliverAndAck() + case _ ⇒ } } override def postStop(): Unit = saveState() def saveState(): Unit = { + def merge(currentState: ResendState, oldState: ResendState): ResendState = + if (currentState.uid == oldState.uid) ResendState(uid, oldState.buffer.mergeFrom(currentState.buffer)) + else currentState + @tailrec - def updateSavedState(key: Link, expectedState: AckedReceiveBuffer[Message]): Unit = { + def updateSavedState(key: Link, expectedState: ResendState): Unit = { if (expectedState eq null) { - if (receiveBuffers.putIfAbsent(key, ackedReceiveBuffer) ne null) updateSavedState(key, receiveBuffers.get(key)) - } else if (!receiveBuffers.replace(key, expectedState, expectedState.mergeFrom(ackedReceiveBuffer))) + if (receiveBuffers.putIfAbsent(key, ResendState(uid, ackedReceiveBuffer)) ne null) + updateSavedState(key, receiveBuffers.get(key)) + } else if (!receiveBuffers.replace(key, expectedState, merge(ResendState(uid, ackedReceiveBuffer), expectedState))) updateSavedState(key, receiveBuffers.get(key)) } diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index c4dc61a10f..b6836f278b 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -258,6 +258,8 @@ private[remote] object EndpointManager { // Helper class to store address pairs case class Link(localAddress: Address, remoteAddress: Address) + case class ResendState(uid: Int, buffer: AckedReceiveBuffer[Message]) + sealed trait EndpointPolicy { /** @@ -429,7 +431,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends } // Structure for saving reliable delivery state across restarts of Endpoints - val receiveBuffers = new ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]() + val receiveBuffers = new ConcurrentHashMap[Link, ResendState]() def receive = { case Listen(addressesPromise) ⇒