diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala index 8fb729930a..454b9ba81a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala @@ -90,7 +90,7 @@ private[cluster] class ClusterRemoteWatcher( if (m.address != selfAddress) { clusterNodes -= m.address if (previousStatus == MemberStatus.Down) { - quarantine(m.address, Some(m.uniqueAddress.uid)) + quarantine(m.address, Some(m.uniqueAddress.uid), "Cluster member removed") } publishAddressTerminated(m.address) } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala index 2166bdac0d..7a337fa57c 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala @@ -58,7 +58,7 @@ abstract class PiercingShouldKeepQuarantineSpec(multiNodeConfig: PiercingShouldK enterBarrier("actor-identified") // Manually Quarantine the other system - RARP(system).provider.transport.quarantine(node(second).address, Some(uid)) + RARP(system).provider.transport.quarantine(node(second).address, Some(uid), "test") // Quarantining is not immediate Thread.sleep(1000) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala index 5fa058abda..ce0a22005b 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala @@ -78,7 +78,7 @@ abstract class RemoteQuarantinePiercingSpec(multiNodeConfig: RemoteQuarantinePie enterBarrier("actor-identified") // Manually Quarantine the other system - RARP(system).provider.transport.quarantine(node(second).address, Some(uidFirst)) + RARP(system).provider.transport.quarantine(node(second).address, Some(uidFirst), "test") // Quarantine is up -- Cannot communicate with remote system any more system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "identify" diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala index 10837756f1..bf66b9795a 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala @@ -80,7 +80,7 @@ abstract class RemoteRestartedQuarantinedSpec val (uid, ref) = identifyWithUid(second, "subject") - RARP(system).provider.transport.quarantine(node(second).address, Some(uid)) + RARP(system).provider.transport.quarantine(node(second).address, Some(uid), "test") enterBarrier("quarantined") enterBarrier("still-quarantined") diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala index 8712d7cb03..1cdcc74fc1 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala @@ -73,7 +73,7 @@ abstract class RemoteRestartedQuarantinedSpec val (uid, ref) = identifyWithUid(second, "subject", 5.seconds) enterBarrier("before-quarantined") - RARP(system).provider.transport.quarantine(node(second).address, Some(uid)) + RARP(system).provider.transport.quarantine(node(second).address, Some(uid), "test") enterBarrier("quarantined") enterBarrier("still-quarantined") diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 46b195cd4e..1aa85416f6 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -449,7 +449,8 @@ private[akka] class RemoteActorRefProvider( * @param uid UID of the remote system, if the uid is not defined it will not be a strong quarantine but * the current endpoint writer will be stopped (dropping system messages) and the address will be gated */ - def quarantine(address: Address, uid: Option[Int]): Unit = transport.quarantine(address, uid) + def quarantine(address: Address, uid: Option[Int], reason: String): Unit = + transport.quarantine(address, uid, reason) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index c865aa508d..c8abdabcba 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -90,6 +90,6 @@ private[akka] abstract class RemoteTransport(val system: ExtendedActorSystem, va * @param uid UID of the remote system, if the uid is not defined it will not be a strong quarantine but * the current endpoint writer will be stopped (dropping system messages) and the address will be gated */ - def quarantine(address: Address, uid: Option[Int]): Unit + def quarantine(address: Address, uid: Option[Int], reason: String): Unit } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala index 1da2c8c797..7681eaf8a3 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala @@ -158,7 +158,7 @@ private[akka] class RemoteWatcher( watchingNodes foreach { a ⇒ if (!unreachable(a) && !failureDetector.isAvailable(a)) { log.warning("Detected unreachable: [{}]", a) - quarantine(a, addressUids.get(a)) + quarantine(a, addressUids.get(a), "Deemed unreachable by remote failure detector") publishAddressTerminated(a) unreachable += a } @@ -167,8 +167,8 @@ private[akka] class RemoteWatcher( def publishAddressTerminated(address: Address): Unit = AddressTerminatedTopic(context.system).publish(AddressTerminated(address)) - def quarantine(address: Address, uid: Option[Int]): Unit = - remoteProvider.quarantine(address, uid) + def quarantine(address: Address, uid: Option[Int], reason: String): Unit = + remoteProvider.quarantine(address, uid, reason) def addWatch(watchee: InternalActorRef, watcher: InternalActorRef): Unit = { assert(watcher != self) @@ -282,4 +282,4 @@ private[akka] class RemoteWatcher( log.debug("Re-watch [{} -> {}]", watcher.path, watchee.path) watchee.sendSystemMessage(Watch(watchee, watcher)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ } -} \ No newline at end of file +} diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index bd59bc4f7c..21597bf459 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -225,7 +225,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc case None ⇒ throw new RemoteTransportExceptionNoStackTrace("Attempted to send management command but Remoting is not running.", null) } - override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = endpointManager match { + override def quarantine(remoteAddress: Address, uid: Option[Int], reason: String): Unit = endpointManager match { case Some(manager) ⇒ manager ! Quarantine(remoteAddress, uid) case _ ⇒ throw new RemoteTransportExceptionNoStackTrace( s"Attempted to quarantine address [$remoteAddress] with uid [$uid] but Remoting is not running", null) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index b257f38da0..25aefe20c3 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -543,7 +543,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private def startAeronErrorLog(): Unit = { val errorLog = new AeronErrorLog(new File(aeronDir, CncFileDescriptor.CNC_FILE)) val lastTimestamp = new AtomicLong(0L) - import system.dispatcher // FIXME perhaps use another dispatcher for this + import system.dispatcher aeronErrorLogTask = system.scheduler.schedule(3.seconds, 5.seconds) { if (!isShutdown) { val newLastTimestamp = errorLog.logErrors(log, lastTimestamp.get) @@ -840,9 +840,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private def publishLifecycleEvent(event: RemotingLifecycleEvent): Unit = eventPublisher.notifyListeners(event) - override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = { - // FIXME change the method signature (old remoting) to include reason and use Long uid? - association(remoteAddress).quarantine(reason = "", uid.map(_.toLong)) + override def quarantine(remoteAddress: Address, uid: Option[Int], reason: String): Unit = { + // FIXME use Long uid + association(remoteAddress).quarantine(reason, uid.map(_.toLong)) } def outboundLarge(outboundContext: OutboundContext): Sink[OutboundEnvelope, Future[Done]] = diff --git a/akka-remote/src/main/scala/akka/remote/artery/Control.scala b/akka-remote/src/main/scala/akka/remote/artery/Control.scala index 66d957244d..d28df42a05 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Control.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Control.scala @@ -60,13 +60,13 @@ private[akka] object InboundControlJunction { * subject to get notification of incoming control * messages. */ - private[akka] trait ControlMessageSubject { + private[remote] trait ControlMessageSubject { def attach(observer: ControlMessageObserver): Future[Done] def detach(observer: ControlMessageObserver): Unit def stopped: Future[Done] } - private[akka] trait ControlMessageObserver { + private[remote] trait ControlMessageObserver { /** * Notification of incoming control message. The message diff --git a/akka-remote/src/test/scala/akka/remote/ActorsLeakSpec.scala b/akka-remote/src/test/scala/akka/remote/ActorsLeakSpec.scala index 409891d001..7012986e2d 100644 --- a/akka-remote/src/test/scala/akka/remote/ActorsLeakSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/ActorsLeakSpec.scala @@ -119,7 +119,7 @@ class ActorsLeakSpec extends AkkaSpec(ActorsLeakSpec.config) with ImplicitSender val beforeQuarantineActors = targets.flatMap(collectLiveActors).toSet // it must not quarantine the current connection - RARP(system).provider.transport.quarantine(remoteAddress, Some(AddressUidExtension(remoteSystem).addressUid + 1)) + RARP(system).provider.transport.quarantine(remoteAddress, Some(AddressUidExtension(remoteSystem).addressUid + 1), "test") // the message from local to remote should reuse passive inbound connection system.actorSelection(RootActorPath(remoteAddress) / "user" / "stoppable") ! Identify(1) diff --git a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala index 59f23d4398..56ff462e8e 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala @@ -53,7 +53,7 @@ object RemoteWatcherSpec { // that doesn't interfere with the real watch that is going on in the background context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address)) - override def quarantine(address: Address, uid: Option[Int]): Unit = { + override def quarantine(address: Address, uid: Option[Int], reason: String): Unit = { // don't quarantine in remoting, but publish a testable message context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid)) } diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index e08a1a0267..701cc5c670 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -769,11 +769,11 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D inboundHandleProbe.expectNoMsg(1.second) // Quarantine unrelated connection - RARP(thisSystem).provider.quarantine(remoteAddress, Some(-1)) + RARP(thisSystem).provider.quarantine(remoteAddress, Some(-1), "test") inboundHandleProbe.expectNoMsg(1.second) // Quarantine the connection - RARP(thisSystem).provider.quarantine(remoteAddress, Some(remoteUID)) + RARP(thisSystem).provider.quarantine(remoteAddress, Some(remoteUID), "test") // Even though the connection is stashed it will be disassociated inboundHandleProbe.expectMsgType[AssociationHandle.Disassociated] diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala index ede7c7f62e..3267448214 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala @@ -54,7 +54,7 @@ object RemoteWatcherSpec { // that doesn't interfere with the real watch that is going on in the background context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address)) - override def quarantine(address: Address, uid: Option[Int]): Unit = { + override def quarantine(address: Address, uid: Option[Int], reason: String): Unit = { // don't quarantine in remoting, but publish a testable message context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid)) } diff --git a/project/MiMa.scala b/project/MiMa.scala index 13351fc3f3..3c293e9b4f 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -966,6 +966,10 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.orElseGraph"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.orElse"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.orElseMat") + ), + "2.4.10" -> Seq( + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteActorRefProvider.quarantine"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteWatcher.quarantine") ) ) }