add reason param to quarantine method
This commit is contained in:
parent
9a7d79c882
commit
3b7a7dfa59
16 changed files with 28 additions and 23 deletions
|
|
@ -90,7 +90,7 @@ private[cluster] class ClusterRemoteWatcher(
|
||||||
if (m.address != selfAddress) {
|
if (m.address != selfAddress) {
|
||||||
clusterNodes -= m.address
|
clusterNodes -= m.address
|
||||||
if (previousStatus == MemberStatus.Down) {
|
if (previousStatus == MemberStatus.Down) {
|
||||||
quarantine(m.address, Some(m.uniqueAddress.uid))
|
quarantine(m.address, Some(m.uniqueAddress.uid), "Cluster member removed")
|
||||||
}
|
}
|
||||||
publishAddressTerminated(m.address)
|
publishAddressTerminated(m.address)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -58,7 +58,7 @@ abstract class PiercingShouldKeepQuarantineSpec(multiNodeConfig: PiercingShouldK
|
||||||
enterBarrier("actor-identified")
|
enterBarrier("actor-identified")
|
||||||
|
|
||||||
// Manually Quarantine the other system
|
// Manually Quarantine the other system
|
||||||
RARP(system).provider.transport.quarantine(node(second).address, Some(uid))
|
RARP(system).provider.transport.quarantine(node(second).address, Some(uid), "test")
|
||||||
|
|
||||||
// Quarantining is not immediate
|
// Quarantining is not immediate
|
||||||
Thread.sleep(1000)
|
Thread.sleep(1000)
|
||||||
|
|
|
||||||
|
|
@ -78,7 +78,7 @@ abstract class RemoteQuarantinePiercingSpec(multiNodeConfig: RemoteQuarantinePie
|
||||||
enterBarrier("actor-identified")
|
enterBarrier("actor-identified")
|
||||||
|
|
||||||
// Manually Quarantine the other system
|
// Manually Quarantine the other system
|
||||||
RARP(system).provider.transport.quarantine(node(second).address, Some(uidFirst))
|
RARP(system).provider.transport.quarantine(node(second).address, Some(uidFirst), "test")
|
||||||
|
|
||||||
// Quarantine is up -- Cannot communicate with remote system any more
|
// Quarantine is up -- Cannot communicate with remote system any more
|
||||||
system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "identify"
|
system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "identify"
|
||||||
|
|
|
||||||
|
|
@ -80,7 +80,7 @@ abstract class RemoteRestartedQuarantinedSpec
|
||||||
|
|
||||||
val (uid, ref) = identifyWithUid(second, "subject")
|
val (uid, ref) = identifyWithUid(second, "subject")
|
||||||
|
|
||||||
RARP(system).provider.transport.quarantine(node(second).address, Some(uid))
|
RARP(system).provider.transport.quarantine(node(second).address, Some(uid), "test")
|
||||||
|
|
||||||
enterBarrier("quarantined")
|
enterBarrier("quarantined")
|
||||||
enterBarrier("still-quarantined")
|
enterBarrier("still-quarantined")
|
||||||
|
|
|
||||||
|
|
@ -73,7 +73,7 @@ abstract class RemoteRestartedQuarantinedSpec
|
||||||
val (uid, ref) = identifyWithUid(second, "subject", 5.seconds)
|
val (uid, ref) = identifyWithUid(second, "subject", 5.seconds)
|
||||||
|
|
||||||
enterBarrier("before-quarantined")
|
enterBarrier("before-quarantined")
|
||||||
RARP(system).provider.transport.quarantine(node(second).address, Some(uid))
|
RARP(system).provider.transport.quarantine(node(second).address, Some(uid), "test")
|
||||||
|
|
||||||
enterBarrier("quarantined")
|
enterBarrier("quarantined")
|
||||||
enterBarrier("still-quarantined")
|
enterBarrier("still-quarantined")
|
||||||
|
|
|
||||||
|
|
@ -449,7 +449,8 @@ private[akka] class RemoteActorRefProvider(
|
||||||
* @param uid UID of the remote system, if the uid is not defined it will not be a strong quarantine but
|
* @param uid UID of the remote system, if the uid is not defined it will not be a strong quarantine but
|
||||||
* the current endpoint writer will be stopped (dropping system messages) and the address will be gated
|
* the current endpoint writer will be stopped (dropping system messages) and the address will be gated
|
||||||
*/
|
*/
|
||||||
def quarantine(address: Address, uid: Option[Int]): Unit = transport.quarantine(address, uid)
|
def quarantine(address: Address, uid: Option[Int], reason: String): Unit =
|
||||||
|
transport.quarantine(address, uid, reason)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -90,6 +90,6 @@ private[akka] abstract class RemoteTransport(val system: ExtendedActorSystem, va
|
||||||
* @param uid UID of the remote system, if the uid is not defined it will not be a strong quarantine but
|
* @param uid UID of the remote system, if the uid is not defined it will not be a strong quarantine but
|
||||||
* the current endpoint writer will be stopped (dropping system messages) and the address will be gated
|
* the current endpoint writer will be stopped (dropping system messages) and the address will be gated
|
||||||
*/
|
*/
|
||||||
def quarantine(address: Address, uid: Option[Int]): Unit
|
def quarantine(address: Address, uid: Option[Int], reason: String): Unit
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -158,7 +158,7 @@ private[akka] class RemoteWatcher(
|
||||||
watchingNodes foreach { a ⇒
|
watchingNodes foreach { a ⇒
|
||||||
if (!unreachable(a) && !failureDetector.isAvailable(a)) {
|
if (!unreachable(a) && !failureDetector.isAvailable(a)) {
|
||||||
log.warning("Detected unreachable: [{}]", a)
|
log.warning("Detected unreachable: [{}]", a)
|
||||||
quarantine(a, addressUids.get(a))
|
quarantine(a, addressUids.get(a), "Deemed unreachable by remote failure detector")
|
||||||
publishAddressTerminated(a)
|
publishAddressTerminated(a)
|
||||||
unreachable += a
|
unreachable += a
|
||||||
}
|
}
|
||||||
|
|
@ -167,8 +167,8 @@ private[akka] class RemoteWatcher(
|
||||||
def publishAddressTerminated(address: Address): Unit =
|
def publishAddressTerminated(address: Address): Unit =
|
||||||
AddressTerminatedTopic(context.system).publish(AddressTerminated(address))
|
AddressTerminatedTopic(context.system).publish(AddressTerminated(address))
|
||||||
|
|
||||||
def quarantine(address: Address, uid: Option[Int]): Unit =
|
def quarantine(address: Address, uid: Option[Int], reason: String): Unit =
|
||||||
remoteProvider.quarantine(address, uid)
|
remoteProvider.quarantine(address, uid, reason)
|
||||||
|
|
||||||
def addWatch(watchee: InternalActorRef, watcher: InternalActorRef): Unit = {
|
def addWatch(watchee: InternalActorRef, watcher: InternalActorRef): Unit = {
|
||||||
assert(watcher != self)
|
assert(watcher != self)
|
||||||
|
|
@ -282,4 +282,4 @@ private[akka] class RemoteWatcher(
|
||||||
log.debug("Re-watch [{} -> {}]", watcher.path, watchee.path)
|
log.debug("Re-watch [{} -> {}]", watcher.path, watchee.path)
|
||||||
watchee.sendSystemMessage(Watch(watchee, watcher)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
watchee.sendSystemMessage(Watch(watchee, watcher)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -225,7 +225,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
|
||||||
case None ⇒ throw new RemoteTransportExceptionNoStackTrace("Attempted to send management command but Remoting is not running.", null)
|
case None ⇒ throw new RemoteTransportExceptionNoStackTrace("Attempted to send management command but Remoting is not running.", null)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = endpointManager match {
|
override def quarantine(remoteAddress: Address, uid: Option[Int], reason: String): Unit = endpointManager match {
|
||||||
case Some(manager) ⇒ manager ! Quarantine(remoteAddress, uid)
|
case Some(manager) ⇒ manager ! Quarantine(remoteAddress, uid)
|
||||||
case _ ⇒ throw new RemoteTransportExceptionNoStackTrace(
|
case _ ⇒ throw new RemoteTransportExceptionNoStackTrace(
|
||||||
s"Attempted to quarantine address [$remoteAddress] with uid [$uid] but Remoting is not running", null)
|
s"Attempted to quarantine address [$remoteAddress] with uid [$uid] but Remoting is not running", null)
|
||||||
|
|
|
||||||
|
|
@ -543,7 +543,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
||||||
private def startAeronErrorLog(): Unit = {
|
private def startAeronErrorLog(): Unit = {
|
||||||
val errorLog = new AeronErrorLog(new File(aeronDir, CncFileDescriptor.CNC_FILE))
|
val errorLog = new AeronErrorLog(new File(aeronDir, CncFileDescriptor.CNC_FILE))
|
||||||
val lastTimestamp = new AtomicLong(0L)
|
val lastTimestamp = new AtomicLong(0L)
|
||||||
import system.dispatcher // FIXME perhaps use another dispatcher for this
|
import system.dispatcher
|
||||||
aeronErrorLogTask = system.scheduler.schedule(3.seconds, 5.seconds) {
|
aeronErrorLogTask = system.scheduler.schedule(3.seconds, 5.seconds) {
|
||||||
if (!isShutdown) {
|
if (!isShutdown) {
|
||||||
val newLastTimestamp = errorLog.logErrors(log, lastTimestamp.get)
|
val newLastTimestamp = errorLog.logErrors(log, lastTimestamp.get)
|
||||||
|
|
@ -840,9 +840,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
||||||
private def publishLifecycleEvent(event: RemotingLifecycleEvent): Unit =
|
private def publishLifecycleEvent(event: RemotingLifecycleEvent): Unit =
|
||||||
eventPublisher.notifyListeners(event)
|
eventPublisher.notifyListeners(event)
|
||||||
|
|
||||||
override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = {
|
override def quarantine(remoteAddress: Address, uid: Option[Int], reason: String): Unit = {
|
||||||
// FIXME change the method signature (old remoting) to include reason and use Long uid?
|
// FIXME use Long uid
|
||||||
association(remoteAddress).quarantine(reason = "", uid.map(_.toLong))
|
association(remoteAddress).quarantine(reason, uid.map(_.toLong))
|
||||||
}
|
}
|
||||||
|
|
||||||
def outboundLarge(outboundContext: OutboundContext): Sink[OutboundEnvelope, Future[Done]] =
|
def outboundLarge(outboundContext: OutboundContext): Sink[OutboundEnvelope, Future[Done]] =
|
||||||
|
|
|
||||||
|
|
@ -60,13 +60,13 @@ private[akka] object InboundControlJunction {
|
||||||
* subject to get notification of incoming control
|
* subject to get notification of incoming control
|
||||||
* messages.
|
* messages.
|
||||||
*/
|
*/
|
||||||
private[akka] trait ControlMessageSubject {
|
private[remote] trait ControlMessageSubject {
|
||||||
def attach(observer: ControlMessageObserver): Future[Done]
|
def attach(observer: ControlMessageObserver): Future[Done]
|
||||||
def detach(observer: ControlMessageObserver): Unit
|
def detach(observer: ControlMessageObserver): Unit
|
||||||
def stopped: Future[Done]
|
def stopped: Future[Done]
|
||||||
}
|
}
|
||||||
|
|
||||||
private[akka] trait ControlMessageObserver {
|
private[remote] trait ControlMessageObserver {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Notification of incoming control message. The message
|
* Notification of incoming control message. The message
|
||||||
|
|
|
||||||
|
|
@ -119,7 +119,7 @@ class ActorsLeakSpec extends AkkaSpec(ActorsLeakSpec.config) with ImplicitSender
|
||||||
val beforeQuarantineActors = targets.flatMap(collectLiveActors).toSet
|
val beforeQuarantineActors = targets.flatMap(collectLiveActors).toSet
|
||||||
|
|
||||||
// it must not quarantine the current connection
|
// it must not quarantine the current connection
|
||||||
RARP(system).provider.transport.quarantine(remoteAddress, Some(AddressUidExtension(remoteSystem).addressUid + 1))
|
RARP(system).provider.transport.quarantine(remoteAddress, Some(AddressUidExtension(remoteSystem).addressUid + 1), "test")
|
||||||
|
|
||||||
// the message from local to remote should reuse passive inbound connection
|
// the message from local to remote should reuse passive inbound connection
|
||||||
system.actorSelection(RootActorPath(remoteAddress) / "user" / "stoppable") ! Identify(1)
|
system.actorSelection(RootActorPath(remoteAddress) / "user" / "stoppable") ! Identify(1)
|
||||||
|
|
|
||||||
|
|
@ -53,7 +53,7 @@ object RemoteWatcherSpec {
|
||||||
// that doesn't interfere with the real watch that is going on in the background
|
// that doesn't interfere with the real watch that is going on in the background
|
||||||
context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address))
|
context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address))
|
||||||
|
|
||||||
override def quarantine(address: Address, uid: Option[Int]): Unit = {
|
override def quarantine(address: Address, uid: Option[Int], reason: String): Unit = {
|
||||||
// don't quarantine in remoting, but publish a testable message
|
// don't quarantine in remoting, but publish a testable message
|
||||||
context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid))
|
context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -769,11 +769,11 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
||||||
inboundHandleProbe.expectNoMsg(1.second)
|
inboundHandleProbe.expectNoMsg(1.second)
|
||||||
|
|
||||||
// Quarantine unrelated connection
|
// Quarantine unrelated connection
|
||||||
RARP(thisSystem).provider.quarantine(remoteAddress, Some(-1))
|
RARP(thisSystem).provider.quarantine(remoteAddress, Some(-1), "test")
|
||||||
inboundHandleProbe.expectNoMsg(1.second)
|
inboundHandleProbe.expectNoMsg(1.second)
|
||||||
|
|
||||||
// Quarantine the connection
|
// Quarantine the connection
|
||||||
RARP(thisSystem).provider.quarantine(remoteAddress, Some(remoteUID))
|
RARP(thisSystem).provider.quarantine(remoteAddress, Some(remoteUID), "test")
|
||||||
|
|
||||||
// Even though the connection is stashed it will be disassociated
|
// Even though the connection is stashed it will be disassociated
|
||||||
inboundHandleProbe.expectMsgType[AssociationHandle.Disassociated]
|
inboundHandleProbe.expectMsgType[AssociationHandle.Disassociated]
|
||||||
|
|
|
||||||
|
|
@ -54,7 +54,7 @@ object RemoteWatcherSpec {
|
||||||
// that doesn't interfere with the real watch that is going on in the background
|
// that doesn't interfere with the real watch that is going on in the background
|
||||||
context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address))
|
context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address))
|
||||||
|
|
||||||
override def quarantine(address: Address, uid: Option[Int]): Unit = {
|
override def quarantine(address: Address, uid: Option[Int], reason: String): Unit = {
|
||||||
// don't quarantine in remoting, but publish a testable message
|
// don't quarantine in remoting, but publish a testable message
|
||||||
context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid))
|
context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -966,6 +966,10 @@ object MiMa extends AutoPlugin {
|
||||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.orElseGraph"),
|
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.orElseGraph"),
|
||||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.orElse"),
|
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.orElse"),
|
||||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.orElseMat")
|
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.orElseMat")
|
||||||
|
),
|
||||||
|
"2.4.10" -> Seq(
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteActorRefProvider.quarantine"),
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteWatcher.quarantine")
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue