Add named exception to detect when a cluster node has been quarantined by others #18758

Publish appropriate events to the current ActorSystem event stream upon remote ActorSystem shutdown or when current ActorSystem is quarantined by the remote ActorSystem.
This commit is contained in:
Eugene Dzhurinsky 2015-12-06 20:14:44 -05:00
parent 9d71142748
commit fb763040f2
7 changed files with 38 additions and 5 deletions

View file

@ -377,6 +377,9 @@ contains the addresses the remoting listens on.
To be notified when the remoting subsystem has been shut down, listen to ``RemotingShutdownEvent``. To be notified when the remoting subsystem has been shut down, listen to ``RemotingShutdownEvent``.
To be notified when the current system is quarantined by the remote system, listen to ``ThisActorSystemQuarantinedEvent``,
which includes the addresses of local and remote ActorSystems.
To intercept generic remoting related errors, listen to ``RemotingErrorEvent`` which holds the ``Throwable`` cause. To intercept generic remoting related errors, listen to ``RemotingErrorEvent`` which holds the ``Throwable`` cause.
Remote Security Remote Security

View file

@ -379,6 +379,9 @@ holds the direction of the association (inbound or outbound), the addresses of t
To be notified when the remoting subsystem is ready to accept associations, listen to ``RemotingListenEvent`` which To be notified when the remoting subsystem is ready to accept associations, listen to ``RemotingListenEvent`` which
contains the addresses the remoting listens on. contains the addresses the remoting listens on.
To be notified when the current system is quarantined by the remote system, listen to ``ThisActorSystemQuarantinedEvent``,
which includes the addresses of local and remote ActorSystems.
To be notified when the remoting subsystem has been shut down, listen to ``RemotingShutdownEvent``. To be notified when the remoting subsystem has been shut down, listen to ``RemotingShutdownEvent``.
To intercept generic remoting related errors, listen to ``RemotingErrorEvent`` which holds the ``Throwable`` cause. To intercept generic remoting related errors, listen to ``RemotingErrorEvent`` which holds the ``Throwable`` cause.

View file

@ -100,6 +100,7 @@ abstract class RemoteRestartedQuarantinedSpec
runOn(second) { runOn(second) {
val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
val firstAddress = node(first).address val firstAddress = node(first).address
system.eventStream.subscribe(testActor, classOf[ThisActorSystemQuarantinedEvent])
val (_, ref) = identifyWithUid(first, "subject") val (_, ref) = identifyWithUid(first, "subject")
@ -114,6 +115,10 @@ abstract class RemoteRestartedQuarantinedSpec
} }
} }
expectMsgPF(10 seconds) {
case ThisActorSystemQuarantinedEvent(local, remote)
}
enterBarrier("still-quarantined") enterBarrier("still-quarantined")
system.awaitTermination(10.seconds) system.awaitTermination(10.seconds)

View file

@ -129,8 +129,11 @@ private[remote] final case class ShutDownAssociation(localAddress: Address, remo
/** /**
* INTERNAL API * INTERNAL API
*/ */
@SerialVersionUID(1L) @SerialVersionUID(2L)
private[remote] final case class InvalidAssociation(localAddress: Address, remoteAddress: Address, cause: Throwable) private[remote] final case class InvalidAssociation(localAddress: Address,
remoteAddress: Address,
cause: Throwable,
disassociationInfo: Option[DisassociateInfo] = None)
extends EndpointException("Invalid address: " + remoteAddress, cause) with AssociationProblem extends EndpointException("Invalid address: " + remoteAddress, cause) with AssociationProblem
/** /**
@ -1000,7 +1003,8 @@ private[remote] class EndpointReader(
localAddress, localAddress,
remoteAddress, remoteAddress,
InvalidAssociationException("The remote system has quarantined this system. No further associations " + InvalidAssociationException("The remote system has quarantined this system. No further associations " +
"to the remote system are possible until this system is restarted.")) "to the remote system are possible until this system is restarted."),
Some(AssociationHandle.Quarantined))
} }
private def deliverAndAck(): Unit = { private def deliverAndAck(): Unit = {

View file

@ -441,7 +441,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
override val supervisorStrategy = override val supervisorStrategy =
OneForOneStrategy(loggingEnabled = false) { OneForOneStrategy(loggingEnabled = false) {
case e @ InvalidAssociation(localAddress, remoteAddress, reason) case e @ InvalidAssociation(localAddress, remoteAddress, reason, disassiciationInfo)
keepQuarantinedOr(remoteAddress) { keepQuarantinedOr(remoteAddress) {
val causedBy = if (reason.getCause == null) "" else s"Caused by: [${reason.getCause.getMessage}]" val causedBy = if (reason.getCause == null) "" else s"Caused by: [${reason.getCause.getMessage}]"
log.warning("Tried to associate with unreachable remote address [{}]. " + log.warning("Tried to associate with unreachable remote address [{}]. " +
@ -450,6 +450,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
remoteAddress, settings.RetryGateClosedFor.toMillis, reason.getMessage, causedBy) remoteAddress, settings.RetryGateClosedFor.toMillis, reason.getMessage, causedBy)
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
} }
disassiciationInfo.foreach {
case AssociationHandle.Quarantined
context.system.eventStream.publish(ThisActorSystemQuarantinedEvent(localAddress, remoteAddress))
case _ // do nothing
}
Stop Stop
case ShutDownAssociation(localAddress, remoteAddress, _) case ShutDownAssociation(localAddress, remoteAddress, _)

View file

@ -3,6 +3,7 @@
*/ */
package akka.remote package akka.remote
import akka.event.Logging.LogLevel
import akka.event.{ LoggingAdapter, Logging } import akka.event.{ LoggingAdapter, Logging }
import akka.actor.{ ActorSystem, Address } import akka.actor.{ ActorSystem, Address }
@ -87,6 +88,12 @@ final case class QuarantinedEvent(address: Address, uid: Int) extends RemotingLi
"from this situation." "from this situation."
} }
@SerialVersionUID(1L)
final case class ThisActorSystemQuarantinedEvent(localAddress: Address, remoteAddress: Address) extends RemotingLifecycleEvent {
override def logLevel: LogLevel = Logging.WarningLevel
override val toString: String = s"The remote system ${remoteAddress} has quarantined this system ${localAddress}."
}
/** /**
* INTERNAL API * INTERNAL API
*/ */

View file

@ -591,7 +591,13 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.ReplayFilter.this"), ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.ReplayFilter.this"),
ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.AsyncWriteJournal.akka$persistence$journal$AsyncWriteJournal$_setter_$akka$persistence$journal$AsyncWriteJournal$$replayDebugEnabled_="), ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.AsyncWriteJournal.akka$persistence$journal$AsyncWriteJournal$_setter_$akka$persistence$journal$AsyncWriteJournal$$replayDebugEnabled_="),
ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.AsyncWriteJournal.akka$persistence$journal$AsyncWriteJournal$$replayDebugEnabled"), ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.AsyncWriteJournal.akka$persistence$journal$AsyncWriteJournal$$replayDebugEnabled"),
ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.ReplayFilter.props") ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.ReplayFilter.props"),
// report invalid association events #18758
ProblemFilters.exclude[MissingTypesProblem]("akka.remote.InvalidAssociation$"),
ProblemFilters.exclude[MissingMethodProblem]("akka.remote.InvalidAssociation.apply"),
ProblemFilters.exclude[MissingMethodProblem]("akka.remote.InvalidAssociation.copy"),
ProblemFilters.exclude[MissingMethodProblem]("akka.remote.InvalidAssociation.this")
) )
) )
} }