Quarantining completely disables outbound attempts #3285
- also a bugfix for use-passive = off
This commit is contained in:
parent
beed693c1b
commit
c36ff9717d
2 changed files with 20 additions and 30 deletions
|
|
@ -154,10 +154,9 @@ private[remote] object ReliableDeliverySupervisor {
|
|||
transport: Transport,
|
||||
settings: RemoteSettings,
|
||||
codec: AkkaPduCodec,
|
||||
refuseUid: Option[Int],
|
||||
receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]): Props =
|
||||
Props(classOf[ReliableDeliverySupervisor], handleOrActive, localAddress, remoteAddress, transport, settings,
|
||||
codec, refuseUid, receiveBuffers)
|
||||
codec, receiveBuffers)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -170,7 +169,6 @@ private[remote] class ReliableDeliverySupervisor(
|
|||
val transport: Transport,
|
||||
val settings: RemoteSettings,
|
||||
val codec: AkkaPduCodec,
|
||||
val refuseUid: Option[Int],
|
||||
val receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]) extends Actor {
|
||||
import ReliableDeliverySupervisor._
|
||||
|
||||
|
|
@ -308,7 +306,6 @@ private[remote] class ReliableDeliverySupervisor(
|
|||
transport = transport,
|
||||
settings = settings,
|
||||
AkkaPduProtobufCodec,
|
||||
refuseUid,
|
||||
receiveBuffers = receiveBuffers,
|
||||
reliableDeliverySupervisor = Some(self))
|
||||
.withDispatcher("akka.remote.writer-dispatcher"),
|
||||
|
|
@ -349,11 +346,10 @@ private[remote] object EndpointWriter {
|
|||
transport: Transport,
|
||||
settings: RemoteSettings,
|
||||
codec: AkkaPduCodec,
|
||||
refuseUid: Option[Int],
|
||||
receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]],
|
||||
reliableDeliverySupervisor: Option[ActorRef]): Props =
|
||||
Props(classOf[EndpointWriter], handleOrActive, localAddress, remoteAddress, transport, settings, codec,
|
||||
refuseUid, receiveBuffers, reliableDeliverySupervisor)
|
||||
receiveBuffers, reliableDeliverySupervisor)
|
||||
|
||||
/**
|
||||
* This message signals that the current association maintained by the local EndpointWriter and EndpointReader is
|
||||
|
|
@ -388,7 +384,6 @@ private[remote] class EndpointWriter(
|
|||
transport: Transport,
|
||||
settings: RemoteSettings,
|
||||
codec: AkkaPduCodec,
|
||||
val refuseUid: Option[Int],
|
||||
val receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]],
|
||||
val reliableDeliverySupervisor: Option[ActorRef])
|
||||
extends EndpointActor(localAddress, remoteAddress, transport, settings, codec) with UnboundedStash
|
||||
|
|
@ -455,19 +450,8 @@ private[remote] class EndpointWriter(
|
|||
case Event(Status.Failure(e: InvalidAssociationException), _) ⇒
|
||||
publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e))
|
||||
case Event(Status.Failure(e), _) ⇒
|
||||
refuseUid match {
|
||||
case Some(uid) ⇒
|
||||
// don't try again when endpoint is quarantined
|
||||
publishAndThrow(new QuarantinedUidException(uid, remoteAddress))
|
||||
case None ⇒
|
||||
publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e))
|
||||
}
|
||||
publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e))
|
||||
case Event(inboundHandle: AkkaProtocolHandle, _) ⇒
|
||||
refuseUid match {
|
||||
case Some(uid) if inboundHandle.handshakeInfo.uid == uid ⇒
|
||||
publishAndThrow(new QuarantinedUidException(inboundHandle.handshakeInfo.uid, inboundHandle.remoteAddress))
|
||||
case _ ⇒ // Everything is fine
|
||||
}
|
||||
// Assert handle == None?
|
||||
context.parent ! ReliableDeliverySupervisor.GotUid(inboundHandle.handshakeInfo.uid)
|
||||
handle = Some(inboundHandle)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue