diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index cca43e2874..e728234edf 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -154,10 +154,9 @@ private[remote] object ReliableDeliverySupervisor { transport: Transport, settings: RemoteSettings, codec: AkkaPduCodec, - refuseUid: Option[Int], receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]): Props = Props(classOf[ReliableDeliverySupervisor], handleOrActive, localAddress, remoteAddress, transport, settings, - codec, refuseUid, receiveBuffers) + codec, receiveBuffers) } /** @@ -170,7 +169,6 @@ private[remote] class ReliableDeliverySupervisor( val transport: Transport, val settings: RemoteSettings, val codec: AkkaPduCodec, - val refuseUid: Option[Int], val receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]) extends Actor { import ReliableDeliverySupervisor._ @@ -308,7 +306,6 @@ private[remote] class ReliableDeliverySupervisor( transport = transport, settings = settings, AkkaPduProtobufCodec, - refuseUid, receiveBuffers = receiveBuffers, reliableDeliverySupervisor = Some(self)) .withDispatcher("akka.remote.writer-dispatcher"), @@ -349,11 +346,10 @@ private[remote] object EndpointWriter { transport: Transport, settings: RemoteSettings, codec: AkkaPduCodec, - refuseUid: Option[Int], receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]], reliableDeliverySupervisor: Option[ActorRef]): Props = Props(classOf[EndpointWriter], handleOrActive, localAddress, remoteAddress, transport, settings, codec, - refuseUid, receiveBuffers, reliableDeliverySupervisor) + receiveBuffers, reliableDeliverySupervisor) /** * This message signals that the current association maintained by the local EndpointWriter and EndpointReader is @@ -388,7 +384,6 @@ private[remote] class EndpointWriter( transport: Transport, settings: RemoteSettings, codec: AkkaPduCodec, - val refuseUid: Option[Int], val receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]], val reliableDeliverySupervisor: Option[ActorRef]) extends EndpointActor(localAddress, remoteAddress, transport, settings, codec) with UnboundedStash @@ -455,19 +450,8 @@ private[remote] class EndpointWriter( case Event(Status.Failure(e: InvalidAssociationException), _) ⇒ publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e)) case Event(Status.Failure(e), _) ⇒ - refuseUid match { - case Some(uid) ⇒ - // don't try again when endpoint is quarantined - publishAndThrow(new QuarantinedUidException(uid, remoteAddress)) - case None ⇒ - publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e)) - } + publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e)) case Event(inboundHandle: AkkaProtocolHandle, _) ⇒ - refuseUid match { - case Some(uid) if inboundHandle.handshakeInfo.uid == uid ⇒ - publishAndThrow(new QuarantinedUidException(inboundHandle.handshakeInfo.uid, inboundHandle.remoteAddress)) - case _ ⇒ // Everything is fine - } // Assert handle == None? context.parent ! ReliableDeliverySupervisor.GotUid(inboundHandle.handshakeInfo.uid) handle = Some(inboundHandle) diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 9d1d0e4a1d..90401ca176 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -332,6 +332,9 @@ private[remote] object EndpointManager { def markAsQuarantined(address: Address, uid: Int, timeOfRelease: Deadline): Unit = addressToWritable += address -> Quarantined(uid, timeOfRelease) + def removePolicy(address: Address): Unit = + addressToWritable -= address + def allEndpoints: collection.Iterable[ActorRef] = writableToAddress.keys ++ readonlyToAddress.keys def prune(): Unit = { @@ -479,26 +482,25 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case s @ Send(message, senderOption, recipientRef, _) ⇒ val recipientAddress = recipientRef.path.address - def createAndRegisterWritingEndpoint(refuseUid: Option[Int]): ActorRef = endpoints.registerWritableEndpoint(recipientAddress, createEndpoint( + def createAndRegisterWritingEndpoint(): ActorRef = endpoints.registerWritableEndpoint(recipientAddress, createEndpoint( recipientAddress, recipientRef.localAddressToUse, transportMapping(recipientRef.localAddressToUse), settings, handleOption = None, - refuseUid, writing = true)) endpoints.writableEndpointWithPolicyFor(recipientAddress) match { case Some(Pass(endpoint)) ⇒ endpoint ! s case Some(Gated(timeOfRelease)) ⇒ - if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint(None) ! s + if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint() ! s else extendedSystem.deadLetters ! s case Some(Quarantined(uid, timeOfRelease)) ⇒ - if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint(None) ! s - else createAndRegisterWritingEndpoint(Some(uid)) ! s + if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint() ! s + else extendedSystem.deadLetters ! s case None ⇒ - createAndRegisterWritingEndpoint(None) ! s + createAndRegisterWritingEndpoint() ! s } @@ -515,12 +517,19 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends transportMapping(handle.localAddress), settings, Some(handle), - refuseUid = None, writing) if (writing) endpoints.registerWritableEndpoint(handle.remoteAddress, endpoint) - else + else { endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint) + endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match { + case Some(Pass(_)) ⇒ // Leave it alone + case _ ⇒ + // Since we just communicated with the guy we can lift gate, quarantine, etc. New writer will be + // opened at first write. + endpoints.removePolicy(handle.remoteAddress) + } + } } } case Terminated(endpoint) ⇒ @@ -605,7 +614,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends transport: Transport, endpointSettings: RemoteSettings, handleOption: Option[AkkaProtocolHandle], - refuseUid: Option[Int], writing: Boolean): ActorRef = { assert(transportMapping contains localAddress) @@ -616,7 +624,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends transport, endpointSettings, AkkaPduProtobufCodec, - refuseUid, receiveBuffers).withDispatcher("akka.remote.writer-dispatcher"), "reliableEndpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next())) else context.watch(context.actorOf(EndpointWriter( @@ -626,7 +633,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends transport, endpointSettings, AkkaPduProtobufCodec, - refuseUid, receiveBuffers, reliableDeliverySupervisor = None).withDispatcher("akka.remote.writer-dispatcher"), "endpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next()))