Added flushing shutdown to Remoting

This commit is contained in:
Endre Sándor Varga 2012-12-13 14:27:34 +01:00
parent ddcb2192eb
commit eb3a0de01b

View file

@ -4,22 +4,21 @@ import scala.language.postfixOps
import akka.actor.SupervisorStrategy._
import akka.actor._
import akka.event.{ Logging, LoggingAdapter }
import akka.pattern.{ gracefulStop, pipe }
import akka.remote.EndpointManager.{ StartupFinished, ManagementCommand, Listen, Send }
import akka.remote.transport.Transport.{ AssociationEventListener, InboundAssociation }
import akka.japi.Util.immutableSeq
import akka.pattern.{ gracefulStop, pipe, ask }
import akka.remote.EndpointManager._
import akka.remote.Remoting.TransportSupervisor
import akka.remote.transport.Transport.AssociationEventListener
import akka.remote.transport.Transport.InboundAssociation
import akka.remote.transport._
import akka.util.Timeout
import com.typesafe.config.{ ConfigFactory, Config }
import com.typesafe.config.Config
import java.net.URLEncoder
import java.util.concurrent.TimeoutException
import scala.collection.immutable.{ Seq, HashMap }
import scala.concurrent.duration._
import scala.concurrent.{ Promise, Await, Future }
import scala.util.control.NonFatal
import java.net.URLEncoder
import java.util.concurrent.TimeoutException
import scala.util.{ Failure, Success }
import scala.collection.immutable
import akka.japi.Util.immutableSeq
import akka.remote.Remoting.{ TransportSupervisor, RegisterTransportActor }
class RemotingSettings(val config: Config) {
@ -30,6 +29,8 @@ class RemotingSettings(val config: Config) {
val ShutdownTimeout: FiniteDuration = Duration(getMilliseconds("akka.remoting.shutdown-timeout"), MILLISECONDS)
val FlushWait: FiniteDuration = Duration(getMilliseconds("akka.remoting.flush-wait-on-shutdown"), MILLISECONDS)
val StartupTimeout: FiniteDuration = Duration(getMilliseconds("akka.remoting.startup-timeout"), MILLISECONDS)
val RetryGateClosedFor: Long = getNanoseconds("akka.remoting.retry-gate-closed-for")
@ -145,11 +146,13 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
endpointManager match {
case Some(manager)
try {
val stopped: Future[Boolean] = gracefulStop(manager, settings.ShutdownTimeout)(system)
implicit val timeout = new Timeout(settings.ShutdownTimeout)
val stopped: Future[Boolean] = (manager ? ShutdownAndFlush).mapTo[Boolean]
if (Await.result(stopped, settings.ShutdownTimeout)) {
eventPublisher.notifyListeners(RemotingShutdownEvent)
}
if (!Await.result(stopped, settings.ShutdownTimeout))
log.warning("Shutdown finished, but flushing timed out. Some messages might not have been sent. " +
"Increase akka.remoting.flush-wait-on-shutdown to a larger value to avoid this message.")
eventPublisher.notifyListeners(RemotingShutdownEvent)
} catch {
case e: TimeoutException notifyError("Shutdown timed out.", e)
@ -231,16 +234,18 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
private[remote] object EndpointManager {
// Messages between Remoting and EndpointManager
sealed trait RemotingCommand
case class Listen(addressesPromise: Promise[Seq[(Transport, Address)]]) extends RemotingCommand
case object StartupFinished extends RemotingCommand
case object ShutdownAndFlush extends RemotingCommand
case class Send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef) extends RemotingCommand {
override def toString = s"Remote message $senderOption -> $recipient"
}
case class ManagementCommand(cmd: Any, statusPromise: Promise[Boolean]) extends RemotingCommand
// Messages internal to EndpointManager
case object Prune
case class ListensResult(addressesPromise: Promise[Seq[(Transport, Address)]],
results: Seq[(Transport, Address, Promise[AssociationEventListener])])
@ -249,8 +254,6 @@ private[remote] object EndpointManager {
case class Gated(timeOfFailure: Long) extends EndpointPolicy
case class Quarantined(reason: Throwable) extends EndpointPolicy
case object Prune
// Not threadsafe -- only to be used in HeadActor
private[EndpointManager] class EndpointRegistry {
private var addressToEndpointAndPolicy = HashMap[Address, EndpointPolicy]()
@ -311,6 +314,8 @@ private[remote] object EndpointManager {
addressToPassive = addressToPassive - address
}
}
def allEndpoints: Iterable[ActorRef] = endpointToAddress.keys
}
}
@ -358,7 +363,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
def receive = {
case Listen(addressesPromise)
listens map { ListensResult(addressesPromise, _) } pipeTo self
case ListensResult(addressesPromise, results)
transportMapping = results.groupBy {
case (_, transportAddress, _) transportAddress
@ -374,10 +378,13 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
transport -> address
}
addressesPromise.success(transportsAndAddresses)
case ManagementCommand(_, statusPromise) statusPromise.success(false)
case StartupFinished context.become(accepting)
case ManagementCommand(_, statusPromise)
statusPromise.success(false)
case StartupFinished
context.become(accepting)
case ShutdownAndFlush
sender ! true
context.stop(self) // Nothing to flush at this point
}
val accepting: Receive = {
@ -398,8 +405,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
None)
endpoints.registerActiveEndpoint(recipientAddress, endpoint)
endpoint ! s
} else extendedSystem.deadLetters forward message
case Some(Quarantined(_)) extendedSystem.deadLetters forward message
} else forwardToDeadLetters(s)
case Some(Quarantined(_)) forwardToDeadLetters(s)
case None
val endpoint = createEndpoint(
recipientAddress,
@ -428,8 +435,31 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
endpoints.registerPassiveEndpoint(handle.remoteAddress, endpoint)
else handle.disassociate()
}
case Terminated(endpoint) endpoints.removeIfNotGated(endpoint)
case Prune endpoints.prune(settings.RetryGateClosedFor)
case Terminated(endpoint)
endpoints.removeIfNotGated(endpoint)
case Prune
endpoints.prune(settings.RetryGateClosedFor)
case ShutdownAndFlush
// Shutdown all endpoints and signal to sender when ready (and whether all endpoints were shut down gracefully)
val sys = context.system // Avoid closing over context
Future sequence endpoints.allEndpoints.map {
gracefulStop(_, settings.FlushWait)(sys)
} map { _.foldLeft(true) { _ && _ } } pipeTo sender
// Ignore all other writes
context.become(flushing)
}
def flushing: Receive = {
case s: Send forwardToDeadLetters(s)
case InboundAssociation(h) h.disassociate()
}
private def forwardToDeadLetters(s: Send): Unit = {
val sender = s.senderOption match {
case Some(sender) sender
case None Actor.noSender
}
extendedSystem.deadLetters.tell(s.message, sender)
}
private def listens: Future[Seq[(Transport, Address, Promise[AssociationEventListener])]] = {