diff --git a/akka-docs/rst/java/remoting.rst b/akka-docs/rst/java/remoting.rst index fcafc352d0..c17a977656 100644 --- a/akka-docs/rst/java/remoting.rst +++ b/akka-docs/rst/java/remoting.rst @@ -377,6 +377,9 @@ contains the addresses the remoting listens on. To be notified when the remoting subsystem has been shut down, listen to ``RemotingShutdownEvent``. +To be notified when the current system is quarantined by the remote system, listen to ``ThisActorSystemQuarantinedEvent``, +which includes the addresses of local and remote ActorSystems. + To intercept generic remoting related errors, listen to ``RemotingErrorEvent`` which holds the ``Throwable`` cause. Remote Security diff --git a/akka-docs/rst/scala/remoting.rst b/akka-docs/rst/scala/remoting.rst index 840090c2b8..9f9739337c 100644 --- a/akka-docs/rst/scala/remoting.rst +++ b/akka-docs/rst/scala/remoting.rst @@ -379,6 +379,9 @@ holds the direction of the association (inbound or outbound), the addresses of t To be notified when the remoting subsystem is ready to accept associations, listen to ``RemotingListenEvent`` which contains the addresses the remoting listens on. +To be notified when the current system is quarantined by the remote system, listen to ``ThisActorSystemQuarantinedEvent``, +which includes the addresses of local and remote ActorSystems. + To be notified when the remoting subsystem has been shut down, listen to ``RemotingShutdownEvent``. To intercept generic remoting related errors, listen to ``RemotingErrorEvent`` which holds the ``Throwable`` cause. diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala index 896c25edc3..b3523e8650 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala @@ -100,6 +100,7 @@ abstract class RemoteRestartedQuarantinedSpec runOn(second) { val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress val firstAddress = node(first).address + system.eventStream.subscribe(testActor, classOf[ThisActorSystemQuarantinedEvent]) val (_, ref) = identifyWithUid(first, "subject") @@ -114,6 +115,10 @@ abstract class RemoteRestartedQuarantinedSpec } } + expectMsgPF(10 seconds) { + case ThisActorSystemQuarantinedEvent(local, remote) ⇒ + } + enterBarrier("still-quarantined") system.awaitTermination(10.seconds) diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 18c9d35cac..58eb726372 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -129,8 +129,11 @@ private[remote] final case class ShutDownAssociation(localAddress: Address, remo /** * INTERNAL API */ -@SerialVersionUID(1L) -private[remote] final case class InvalidAssociation(localAddress: Address, remoteAddress: Address, cause: Throwable) +@SerialVersionUID(2L) +private[remote] final case class InvalidAssociation(localAddress: Address, + remoteAddress: Address, + cause: Throwable, + disassociationInfo: Option[DisassociateInfo] = None) extends EndpointException("Invalid address: " + remoteAddress, cause) with AssociationProblem /** @@ -1000,7 +1003,8 @@ private[remote] class EndpointReader( localAddress, remoteAddress, InvalidAssociationException("The remote system has quarantined this system. No further associations " + - "to the remote system are possible until this system is restarted.")) + "to the remote system are possible until this system is restarted."), + Some(AssociationHandle.Quarantined)) } private def deliverAndAck(): Unit = { diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 445df3ee84..9144283f8f 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -441,7 +441,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { - case e @ InvalidAssociation(localAddress, remoteAddress, reason) ⇒ + case e @ InvalidAssociation(localAddress, remoteAddress, reason, disassiciationInfo) ⇒ keepQuarantinedOr(remoteAddress) { val causedBy = if (reason.getCause == null) "" else s"Caused by: [${reason.getCause.getMessage}]" log.warning("Tried to associate with unreachable remote address [{}]. " + @@ -450,6 +450,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends remoteAddress, settings.RetryGateClosedFor.toMillis, reason.getMessage, causedBy) endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) } + disassiciationInfo.foreach { + case AssociationHandle.Quarantined ⇒ + context.system.eventStream.publish(ThisActorSystemQuarantinedEvent(localAddress, remoteAddress)) + case _ ⇒ // do nothing + } Stop case ShutDownAssociation(localAddress, remoteAddress, _) ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala b/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala index e72a8d6221..c1b7334ee8 100644 --- a/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala +++ b/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala @@ -3,6 +3,7 @@ */ package akka.remote +import akka.event.Logging.LogLevel import akka.event.{ LoggingAdapter, Logging } import akka.actor.{ ActorSystem, Address } @@ -87,6 +88,12 @@ final case class QuarantinedEvent(address: Address, uid: Int) extends RemotingLi "from this situation." } +@SerialVersionUID(1L) +final case class ThisActorSystemQuarantinedEvent(localAddress: Address, remoteAddress: Address) extends RemotingLifecycleEvent { + override def logLevel: LogLevel = Logging.WarningLevel + override val toString: String = s"The remote system ${remoteAddress} has quarantined this system ${localAddress}." +} + /** * INTERNAL API */ diff --git a/project/MiMa.scala b/project/MiMa.scala index 0b7dbacf19..33cc2f50f2 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -591,7 +591,13 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.ReplayFilter.this"), ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.AsyncWriteJournal.akka$persistence$journal$AsyncWriteJournal$_setter_$akka$persistence$journal$AsyncWriteJournal$$replayDebugEnabled_="), ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.AsyncWriteJournal.akka$persistence$journal$AsyncWriteJournal$$replayDebugEnabled"), - ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.ReplayFilter.props") + ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.ReplayFilter.props"), + + // report invalid association events #18758 + ProblemFilters.exclude[MissingTypesProblem]("akka.remote.InvalidAssociation$"), + ProblemFilters.exclude[MissingMethodProblem]("akka.remote.InvalidAssociation.apply"), + ProblemFilters.exclude[MissingMethodProblem]("akka.remote.InvalidAssociation.copy"), + ProblemFilters.exclude[MissingMethodProblem]("akka.remote.InvalidAssociation.this") ) ) }