=rem #16505: Do not publish AddressTerminated

- also not throw IllegalAssocEx from transport
 - added a test for stashing behavior
(cherry picked from commit b7295a8)
This commit is contained in:
Endre Sándor Varga 2015-02-17 13:40:48 +01:00
parent 7a0345ca0e
commit b3f4012746
4 changed files with 180 additions and 33 deletions

View file

@ -166,6 +166,9 @@ private[remote] object ReliableDeliverySupervisor {
case object AttemptSysMsgRedelivery
final case class GotUid(uid: Int, remoteAddres: Address)
case object IsIdle
case object Idle
def props(
handleOrActive: Option[AkkaProtocolHandle],
localAddress: Address,
@ -272,6 +275,7 @@ private[remote] class ReliableDeliverySupervisor(
resendAll()
writer ! FlushAndStop
context.become(flushWait)
case IsIdle // Do not reply, we will Terminate soon, or send a GotUid
case s: Send
handleSend(s)
case ack: Ack
@ -311,6 +315,7 @@ private[remote] class ReliableDeliverySupervisor(
def gated: Receive = {
case Terminated(_)
context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate)
case IsIdle sender() ! Idle
case Ungate
if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) {
// If we talk to a system we have not talked to before (or has given up talking to in the past) stop
@ -335,6 +340,7 @@ private[remote] class ReliableDeliverySupervisor(
}
def idle: Receive = {
case IsIdle sender() ! Idle
case s: Send
writer = createWriter()
// Resending will be triggered by the incoming GotUid message after the connection finished
@ -352,6 +358,7 @@ private[remote] class ReliableDeliverySupervisor(
}
def flushWait: Receive = {
case IsIdle // Do not reply, we will Terminate soon, which will do the inbound connection unstashing
case Terminated(_)
// Clear buffer to prevent sending system messages to dead letters -- at this point we are shutting down
// and don't really know if they were properly delivered or not.

View file

@ -22,7 +22,6 @@ import scala.util.{ Failure, Success }
import akka.remote.transport.AkkaPduCodec.Message
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.event.AddressTerminatedTopic
/**
* INTERNAL API
@ -421,10 +420,10 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
var pendingReadHandoffs = Map[ActorRef, AkkaProtocolHandle]()
var stashedInbound = Map[ActorRef, Vector[InboundAssociation]]()
def handleStashedInbound(endpoint: ActorRef) {
def handleStashedInbound(endpoint: ActorRef, writerIsIdle: Boolean) {
val stashed = stashedInbound.getOrElse(endpoint, Vector.empty)
stashedInbound -= endpoint
stashed foreach (handleInboundAssociation _)
stashed foreach (handleInboundAssociation(_, writerIsIdle))
}
def keepQuarantinedOr(remoteAddress: Address)(body: Unit): Unit = endpoints.refuseUid(remoteAddress) match {
@ -446,7 +445,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
remoteAddress, settings.RetryGateClosedFor.toMillis, reason.getMessage, causedBy)
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
}
AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress))
Stop
case ShutDownAssociation(localAddress, remoteAddress, _)
@ -456,7 +454,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
remoteAddress, settings.RetryGateClosedFor.toMillis)
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
}
AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress))
Stop
case HopelessAssociation(localAddress, remoteAddress, Some(uid), reason)
@ -468,7 +465,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid))
case _ // disabled
}
AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress))
Stop
case HopelessAssociation(localAddress, remoteAddress, None, _)
@ -478,7 +474,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
remoteAddress, settings.RetryGateClosedFor.toMillis)
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
}
AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress))
Stop
case NonFatal(e)
@ -589,18 +584,20 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
}
case ia @ InboundAssociation(handle: AkkaProtocolHandle)
handleInboundAssociation(ia)
handleInboundAssociation(ia, writerIsIdle = false)
case EndpointWriter.StoppedReading(endpoint)
acceptPendingReader(takingOverFrom = endpoint)
case Terminated(endpoint)
acceptPendingReader(takingOverFrom = endpoint)
endpoints.unregisterEndpoint(endpoint)
handleStashedInbound(endpoint)
handleStashedInbound(endpoint, writerIsIdle = false)
case EndpointWriter.TookOver(endpoint, handle)
removePendingReader(takingOverFrom = endpoint, withHandle = handle)
case ReliableDeliverySupervisor.GotUid(uid, remoteAddress)
endpoints.registerWritableEndpointUid(remoteAddress, uid)
handleStashedInbound(sender)
handleStashedInbound(sender(), writerIsIdle = false)
case ReliableDeliverySupervisor.Idle
handleStashedInbound(sender(), writerIsIdle = true)
case Prune
endpoints.prune()
case ShutdownAndFlush
@ -631,7 +628,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
case Terminated(_) // why should we care now?
}
def handleInboundAssociation(ia: InboundAssociation): Unit = ia match {
def handleInboundAssociation(ia: InboundAssociation, writerIsIdle: Boolean): Unit = ia match {
case ia @ InboundAssociation(handle: AkkaProtocolHandle) endpoints.readOnlyEndpointFor(handle.remoteAddress) match {
case Some(endpoint)
pendingReadHandoffs.get(endpoint) foreach (_.disassociate())
@ -642,7 +639,13 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
handle.disassociate(AssociationHandle.Quarantined)
else endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match {
case Some(Pass(ep, None, _))
stashedInbound += ep -> (stashedInbound.getOrElse(ep, Vector.empty) :+ ia)
// Idle writer will never send a GotUid or a Terminated so we need to "provoke it"
// to get an unstash event
if (!writerIsIdle) {
ep ! ReliableDeliverySupervisor.IsIdle
stashedInbound += ep -> (stashedInbound.getOrElse(ep, Vector.empty) :+ ia)
} else
createAndRegisterEndpoint(handle, refuseUid = endpoints.refuseUid(handle.remoteAddress))
case Some(Pass(ep, Some(uid), _))
if (handle.handshakeInfo.uid == uid) {
pendingReadHandoffs.get(ep) foreach (_.disassociate())

View file

@ -468,8 +468,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
readyChannel.getPipeline.get(classOf[ClientHandler]).statusFuture
} yield handle) recover {
case c: CancellationException throw new NettyTransportException("Connection was cancelled") with NoStackTrace
case u @ (_: UnknownHostException | _: SecurityException | _: ConnectException) throw new InvalidAssociationException(u.getMessage, u.getCause)
case NonFatal(t) throw new NettyTransportException(t.getMessage, t.getCause) with NoStackTrace
case NonFatal(t) throw new NettyTransportException(t.getMessage, t.getCause) with NoStackTrace
}
}
}