+rem: Implement Quarantine piercing

- Added refuseUid support in Akka protocol and EndpointManager
 - The AkkaProtocolTransport interface is now a first-class citizen in remoting and endpoint actors
 - The AkkaProtocolTransport interface is now a first-class citizen in endpoint actors
This commit is contained in:
Endre Sándor Varga 2013-11-26 15:25:05 +01:00
parent 3fdf20dfc6
commit c9fcc5eb5c
5 changed files with 287 additions and 88 deletions

View file

@ -6,13 +6,11 @@ package akka.remote
import akka.actor.SupervisorStrategy._
import akka.actor._
import akka.event.{ Logging, LoggingAdapter }
import akka.japi.Util.immutableSeq
import akka.pattern.{ AskTimeoutException, gracefulStop, pipe, ask }
import akka.pattern.{ gracefulStop, pipe, ask }
import akka.remote.EndpointManager._
import akka.remote.Remoting.TransportSupervisor
import akka.remote.transport.Transport.{ ActorAssociationEventListener, AssociationEventListener, InboundAssociation }
import akka.remote.transport._
import akka.util.Timeout
import com.typesafe.config.Config
import java.net.URLEncoder
import java.util.concurrent.TimeoutException
@ -55,7 +53,7 @@ private[remote] object Remoting {
final val EndpointManagerName = "endpointManager"
def localAddressForRemote(transportMapping: Map[String, Set[(Transport, Address)]], remote: Address): Address = {
def localAddressForRemote(transportMapping: Map[String, Set[(AkkaProtocolTransport, Address)]], remote: Address): Address = {
transportMapping.get(remote.protocol) match {
case Some(transports)
@ -106,7 +104,7 @@ private[remote] object Remoting {
private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) {
@volatile private var endpointManager: Option[ActorRef] = None
@volatile private var transportMapping: Map[String, Set[(Transport, Address)]] = _
@volatile private var transportMapping: Map[String, Set[(AkkaProtocolTransport, Address)]] = _
// This is effectively a write-once variable similar to a lazy val. The reason for not using a lazy val is exception
// handling.
@volatile var addresses: Set[Address] = _
@ -167,10 +165,10 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
endpointManager = Some(manager)
try {
val addressesPromise: Promise[Seq[(Transport, Address)]] = Promise()
val addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]] = Promise()
manager ! Listen(addressesPromise)
val transports: Seq[(Transport, Address)] = Await.result(addressesPromise.future,
val transports: Seq[(AkkaProtocolTransport, Address)] = Await.result(addressesPromise.future,
StartupTimeout.duration)
if (transports.isEmpty) throw new RemoteTransportException("No transport drivers were loaded.", null)
@ -234,7 +232,7 @@ private[remote] object EndpointManager {
// Messages between Remoting and EndpointManager
sealed trait RemotingCommand extends NoSerializationVerificationNeeded
case class Listen(addressesPromise: Promise[Seq[(Transport, Address)]]) extends RemotingCommand
case class Listen(addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]]) extends RemotingCommand
case object StartupFinished extends RemotingCommand
case object ShutdownAndFlush extends RemotingCommand
case class Send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef, seqOpt: Option[SeqNo] = None)
@ -251,10 +249,10 @@ private[remote] object EndpointManager {
// Messages internal to EndpointManager
case object Prune extends NoSerializationVerificationNeeded
case class ListensResult(addressesPromise: Promise[Seq[(Transport, Address)]],
results: Seq[(Transport, Address, Promise[AssociationEventListener])])
case class ListensResult(addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]],
results: Seq[(AkkaProtocolTransport, Address, Promise[AssociationEventListener])])
extends NoSerializationVerificationNeeded
case class ListensFailure(addressesPromise: Promise[Seq[(Transport, Address)]], cause: Throwable)
case class ListensFailure(addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]], cause: Throwable)
extends NoSerializationVerificationNeeded
// Helper class to store address pairs
@ -382,7 +380,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
// will be not part of this map!
val endpoints = new EndpointRegistry
// Mapping between transports and the local addresses they listen to
var transportMapping: Map[Address, Transport] = Map()
var transportMapping: Map[Address, AkkaProtocolTransport] = Map()
def retryGateEnabled = settings.RetryGateClosedFor > Duration.Zero
val pruneInterval: FiniteDuration = if (retryGateEnabled) settings.RetryGateClosedFor * 2 else Duration.Zero
@ -394,10 +392,10 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
override val supervisorStrategy =
OneForOneStrategy(loggingEnabled = false) {
case InvalidAssociation(localAddress, remoteAddress, _)
case e @ InvalidAssociation(localAddress, remoteAddress, reason)
log.warning("Tried to associate with unreachable remote address [{}]. " +
"Address is now gated for {} ms, all messages to this address will be delivered to dead letters.",
remoteAddress, settings.UnknownAddressGateClosedFor.toMillis)
"Address is now gated for {} ms, all messages to this address will be delivered to dead letters. Reason: {}",
remoteAddress, settings.UnknownAddressGateClosedFor.toMillis, reason.getMessage)
endpoints.markAsFailed(sender, Deadline.now + settings.UnknownAddressGateClosedFor)
context.system.eventStream.publish(AddressTerminated(remoteAddress))
Stop
@ -468,7 +466,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
case ia: InboundAssociation
context.system.scheduler.scheduleOnce(10.milliseconds, self, ia)
case ManagementCommand(_)
sender ! ManagementCommandAck(false)
sender ! ManagementCommandAck(status = false)
case StartupFinished
context.become(accepting)
case ShutdownAndFlush
@ -504,25 +502,30 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
case s @ Send(message, senderOption, recipientRef, _)
val recipientAddress = recipientRef.path.address
def createAndRegisterWritingEndpoint(): ActorRef = endpoints.registerWritableEndpoint(recipientAddress, createEndpoint(
recipientAddress,
recipientRef.localAddressToUse,
transportMapping(recipientRef.localAddressToUse),
settings,
handleOption = None,
writing = true))
def createAndRegisterWritingEndpoint(refuseUid: Option[Int]): ActorRef =
endpoints.registerWritableEndpoint(
recipientAddress,
createEndpoint(
recipientAddress,
recipientRef.localAddressToUse,
transportMapping(recipientRef.localAddressToUse),
settings,
handleOption = None,
writing = true,
refuseUid))
endpoints.writableEndpointWithPolicyFor(recipientAddress) match {
case Some(Pass(endpoint))
endpoint ! s
case Some(Gated(timeOfRelease))
if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint() ! s
else extendedSystem.deadLetters ! s
case Some(Quarantined(uid, timeOfRelease))
if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint() ! s
if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint(refuseUid = None) ! s
else extendedSystem.deadLetters ! s
case Some(Quarantined(uid, _))
// timeOfRelease is only used for garbage collection reasons, therefore it is ignored here. We still have
// the Quarantined tombstone and we know what UID we don't want to accept, so use it.
createAndRegisterWritingEndpoint(refuseUid = Some(uid)) ! s
case None
createAndRegisterWritingEndpoint() ! s
createAndRegisterWritingEndpoint(refuseUid = None) ! s
}
@ -541,14 +544,15 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
ep ! EndpointWriter.StopReading(ep)
case _
val writing = settings.UsePassiveConnections && !endpoints.hasWritableEndpointFor(handle.remoteAddress)
eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, true))
eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true))
val endpoint = createEndpoint(
handle.remoteAddress,
handle.localAddress,
transportMapping(handle.localAddress),
settings,
Some(handle),
writing)
writing,
refuseUid = None)
if (writing)
endpoints.registerWritableEndpoint(handle.remoteAddress, endpoint)
else {
@ -576,7 +580,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
// Shutdown all endpoints and signal to sender when ready (and whether all endpoints were shut down gracefully)
def shutdownAll[T](resources: TraversableOnce[T])(shutdown: T Future[Boolean]): Future[Boolean] = {
(Future sequence resources.map(shutdown(_))) map { _.foldLeft(true) { _ && _ } } recover {
(Future sequence resources.map(shutdown)) map { _.forall(identity) } recover {
case NonFatal(_) false
}
}
@ -600,7 +604,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
case Terminated(_) // why should we care now?
}
private def listens: Future[Seq[(Transport, Address, Promise[AssociationEventListener])]] = {
private def listens: Future[Seq[(AkkaProtocolTransport, Address, Promise[AssociationEventListener])]] = {
/*
* Constructs chains of adapters on top of each driver as given in configuration. The resulting structure looks
* like the following:
@ -619,9 +623,9 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
.createInstanceFor[Transport](fqn, args).recover({
case exception throw new IllegalArgumentException(
(s"Cannot instantiate transport [$fqn]. " +
s"Cannot instantiate transport [$fqn]. " +
"Make sure it extends [akka.remote.transport.Transport] and has constructor with " +
"[akka.actor.ExtendedActorSystem] and [com.typesafe.config.Config] parameters"), exception)
"[akka.actor.ExtendedActorSystem] and [com.typesafe.config.Config] parameters", exception)
}).get
@ -629,7 +633,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
// The chain at this point:
// Adapter <- ... <- Adapter <- Driver
val wrappedTransport =
adapters.map { TransportAdaptersExtension.get(context.system).getAdapterProvider(_) }.foldLeft(driver) {
adapters.map { TransportAdaptersExtension.get(context.system).getAdapterProvider }.foldLeft(driver) {
(t: Transport, provider: TransportAdapterProvider)
// The TransportAdapterProvider will wrap the given Transport and returns with a wrapped one
provider.create(t, context.system.asInstanceOf[ExtendedActorSystem])
@ -651,14 +655,15 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
if (pendingReadHandoffs.contains(takingOverFrom)) {
val handle = pendingReadHandoffs(takingOverFrom)
pendingReadHandoffs -= takingOverFrom
eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, true))
eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true))
val endpoint = createEndpoint(
handle.remoteAddress,
handle.localAddress,
transportMapping(handle.localAddress),
settings,
Some(handle),
writing = false)
writing = false,
refuseUid = None)
endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint)
}
}
@ -670,16 +675,19 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
private def createEndpoint(remoteAddress: Address,
localAddress: Address,
transport: Transport,
transport: AkkaProtocolTransport,
endpointSettings: RemoteSettings,
handleOption: Option[AkkaProtocolHandle],
writing: Boolean): ActorRef = {
writing: Boolean,
refuseUid: Option[Int]): ActorRef = {
assert(transportMapping contains localAddress)
assert(writing || refuseUid.isEmpty)
if (writing) context.watch(context.actorOf(RARP(extendedSystem).configureDispatcher(ReliableDeliverySupervisor.props(
handleOption,
localAddress,
remoteAddress,
refuseUid,
transport,
endpointSettings,
AkkaPduProtobufCodec,
@ -689,6 +697,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
handleOption,
localAddress,
remoteAddress,
refuseUid,
transport,
endpointSettings,
AkkaPduProtobufCodec,