From 12cbf8392706114b95098cf997398908d380077a Mon Sep 17 00:00:00 2001 From: Thibaut Robert Date: Thu, 19 Feb 2015 15:49:02 +0100 Subject: [PATCH] =rem improve remote watching mechanism This improves the remote watching mechanism as follows: Watch requests are intercepted by the RemoteWatcher and not sent on the wire, excepted watches from the remoteWatcher itself. RemoteWatcher is then in charge of forwarding DeathWatchNotification messages to the watchers. This reduces the number of watch message to one per watchee, even if there are several watcher on the same watchee (instead of n+1 before). Reversed watch messages, and watch on ref with undefinedUid are excluded from interception by the RemoteWatcher and so are handled as before this commit. In addition, the following changes are made: - Keep watchers in a map watchee -> watchers for more efficient retrieval (in a scala Multimap) - Keep watchees in a map address -> watchee for more efficient retrieval (in a scala Multimap) - Use of InternalActorRef more thoroughly to avoid casts - Rewatch use a standard watch message, as the distinction is longer needed --- .../akka/dispatch/sysmsg/SystemMessage.scala | 4 +- .../akka/cluster/ClusterRemoteWatcher.scala | 23 +- .../akka/cluster/ClusterDeathWatchSpec.scala | 29 +-- .../akka/remote/RemoteActorRefProvider.scala | 50 ++-- .../scala/akka/remote/RemoteWatcher.scala | 217 ++++++++---------- .../scala/akka/remote/RemoteWatcherSpec.scala | 32 +-- project/MiMa.scala | 20 +- 7 files changed, 173 insertions(+), 202 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/sysmsg/SystemMessage.scala b/akka-actor/src/main/scala/akka/dispatch/sysmsg/SystemMessage.scala index 00032a10a1..0dff418daa 100644 --- a/akka-actor/src/main/scala/akka/dispatch/sysmsg/SystemMessage.scala +++ b/akka-actor/src/main/scala/akka/dispatch/sysmsg/SystemMessage.scala @@ -239,11 +239,11 @@ private[akka] final case class Supervise(child: ActorRef, async: Boolean) extend * INTERNAL API */ @SerialVersionUID(1L) -private[akka] case class Watch(watchee: InternalActorRef, watcher: InternalActorRef) extends SystemMessage // sent to establish a DeathWatch +private[akka] final case class Watch(watchee: InternalActorRef, watcher: InternalActorRef) extends SystemMessage // sent to establish a DeathWatch /** * INTERNAL API */ -@SerialVersionUID(1L) +@SerialVersionUID(1L) // Watch and Unwatch have different signatures, but this can't be changed without breaking serialization compatibility private[akka] final case class Unwatch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to tear down a DeathWatch /** * INTERNAL API diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala index 33686f4d28..8119bde413 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala @@ -4,16 +4,13 @@ package akka.cluster import scala.concurrent.duration.FiniteDuration -import akka.actor.Actor -import akka.actor.Address -import akka.actor.Props +import akka.actor._ import akka.cluster.ClusterEvent.CurrentClusterState import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.ClusterEvent.MemberUp import akka.cluster.ClusterEvent.MemberRemoved import akka.remote.FailureDetectorRegistry import akka.remote.RemoteWatcher -import akka.actor.Deploy /** * INTERNAL API @@ -36,7 +33,7 @@ private[cluster] object ClusterRemoteWatcher { * * Specialization of [[akka.remote.RemoteWatcher]] that keeps * track of cluster member nodes and is responsible for watchees on cluster nodes. - * [[akka.actor.AddressTerminate]] is published when node is removed from cluster. + * [[akka.actor.AddressTerminated]] is published when node is removed from cluster. * * `RemoteWatcher` handles non-cluster nodes. `ClusterRemoteWatcher` will take * over responsibility from `RemoteWatcher` if a watch is added before a node is member @@ -53,8 +50,6 @@ private[cluster] class ClusterRemoteWatcher( unreachableReaperInterval, heartbeatExpectedResponseAfter) { - import RemoteWatcher._ - val cluster = Cluster(context.system) import cluster.selfAddress @@ -73,8 +68,6 @@ private[cluster] class ClusterRemoteWatcher( override def receive = receiveClusterEvent orElse super.receive def receiveClusterEvent: Actor.Receive = { - case WatchRemote(watchee, watcher) if clusterNodes(watchee.path.address) ⇒ - () // cluster managed node, don't propagate to super case state: CurrentClusterState ⇒ clusterNodes = state.members.collect { case m if m.address != selfAddress ⇒ m.address } clusterNodes foreach takeOverResponsibility @@ -96,16 +89,18 @@ private[cluster] class ClusterRemoteWatcher( case _: MemberEvent ⇒ // not interesting } + override def watchNode(watchee: InternalActorRef) = + if (!clusterNodes(watchee.path.address)) super.watchNode(watchee) + /** * When a cluster node is added this class takes over the * responsibility for watchees on that node already handled * by super RemoteWatcher. */ - def takeOverResponsibility(address: Address): Unit = { - watching foreach { - case (watchee, watcher) ⇒ if (watchee.path.address == address) - unwatchRemote(watchee, watcher) + def takeOverResponsibility(address: Address): Unit = + if (watchingNodes(address)) { + log.debug("Cluster is taking over responsibility of node: [{}]", address) + unwatchNode(address) } - } } \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala index 17bf58b8b6..88a77b1237 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala @@ -6,28 +6,15 @@ package akka.cluster import language.postfixOps import scala.concurrent.Await import scala.concurrent.duration._ -import com.typesafe.config.ConfigFactory -import org.scalatest.BeforeAndAfter import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.testkit.TestEvent._ -import akka.actor.Props -import akka.actor.Actor -import akka.actor.Address -import akka.actor.RootActorPath -import akka.actor.Terminated -import akka.actor.Address +import akka.actor._ import akka.remote.RemoteActorRef import java.util.concurrent.TimeoutException -import akka.actor.ActorSystemImpl -import akka.actor.ActorIdentity -import akka.actor.Identify -import akka.actor.ActorRef import akka.remote.RemoteWatcher -import akka.actor.ActorSystem import akka.cluster.MultiNodeClusterSpec.EndActor -import akka.actor.Deploy object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -172,7 +159,9 @@ abstract class ClusterDeathWatchSpec // fifth is not cluster member, so the watch is handled by the RemoteWatcher awaitAssert { remoteWatcher ! RemoteWatcher.Stats - expectMsgType[RemoteWatcher.Stats].watchingRefs should contain((subject5, testActor)) + val stats = expectMsgType[RemoteWatcher.Stats] + stats.watchingRefs should contain(subject5 → testActor) + stats.watchingAddresses should contain(address(fifth)) } } enterBarrier("remote-watch") @@ -181,13 +170,13 @@ abstract class ClusterDeathWatchSpec awaitClusterUp(first, fourth, fifth) runOn(first) { - // fifth is member, so the watch is handled by the ClusterRemoteWatcher, - // and cleaned up from RemoteWatcher + // fifth is member, so the node is handled by the ClusterRemoteWatcher, + // but the watch is still in RemoteWatcher awaitAssert { remoteWatcher ! RemoteWatcher.Stats - expectMsgType[RemoteWatcher.Stats].watchingRefs.map { - case (watchee, watcher) ⇒ watchee.path.name - } should not contain ("subject5") + val stats = expectMsgType[RemoteWatcher.Stats] + stats.watchingRefs.map(_._1.path.name) should contain("subject5") + stats.watchingAddresses should not contain address(fifth) } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index f162da91ea..189d0b1310 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -8,14 +8,12 @@ import akka.actor._ import akka.dispatch.sysmsg._ import akka.event.{ Logging, LoggingAdapter, EventStream } import akka.event.Logging.Error -import akka.serialization.{ JavaSerializer, Serialization, SerializationExtension } +import akka.serialization.{ Serialization, SerializationExtension } import akka.pattern.pipe import scala.util.control.NonFatal import akka.actor.SystemGuardian.{ TerminationHookDone, TerminationHook, RegisterTerminationHook } import scala.util.control.Exception.Catcher -import scala.concurrent.{ ExecutionContext, Future } -import scala.concurrent.forkjoin.ThreadLocalRandom -import com.typesafe.config.Config +import scala.concurrent.Future import akka.ConfigurationException import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } @@ -150,7 +148,9 @@ private[akka] class RemoteActorRefProvider( // This actor ensures the ordering of shutdown between remoteDaemon and the transport @volatile private var remotingTerminator: ActorRef = _ - @volatile private var remoteWatcher: ActorRef = _ + @volatile private var _remoteWatcher: ActorRef = _ + private[akka] def remoteWatcher = _remoteWatcher + @volatile private var remoteDeploymentWatcher: ActorRef = _ def init(system: ActorSystemImpl): Unit = { @@ -183,7 +183,7 @@ private[akka] class RemoteActorRefProvider( // this enables reception of remote requests transport.start() - remoteWatcher = createRemoteWatcher(system) + _remoteWatcher = createRemoteWatcher(system) remoteDeploymentWatcher = createRemoteDeploymentWatcher(system) } @@ -422,20 +422,6 @@ private[akka] class RemoteActorRefProvider( */ def quarantine(address: Address, uid: Option[Int]): Unit = transport.quarantine(address, uid) - /** - * INTERNAL API - */ - private[akka] def afterSendSystemMessage(message: SystemMessage): Unit = - message match { - // Sending to local remoteWatcher relies strong delivery guarantees of local send, i.e. - // default dispatcher must not be changed to an implementation that defeats that - case rew: RemoteWatcher.Rewatch ⇒ - remoteWatcher ! RemoteWatcher.RewatchRemote(rew.watchee, rew.watcher) - case Watch(watchee, watcher) ⇒ remoteWatcher ! RemoteWatcher.WatchRemote(watchee, watcher) - case Unwatch(watchee, watcher) ⇒ remoteWatcher ! RemoteWatcher.UnwatchRemote(watchee, watcher) - case _ ⇒ - } - } private[akka] trait RemoteRef extends ActorRefScope { @@ -475,10 +461,30 @@ private[akka] class RemoteActorRef private[akka] ( remote.system.eventStream.publish(Error(e, path.toString, getClass, "swallowing exception during message send")) } + /** + * Determine if a watch/unwatch message must be handled by the remoteWatcher actor, or sent to this remote ref + */ + def isWatchIntercepted(watchee: ActorRef, watcher: ActorRef) = + if (watchee.path.uid == akka.actor.ActorCell.undefinedUid) { + provider.log.debug("actorFor is deprecated, and watching a remote ActorRef acquired with actorFor is not reliable: [{}]", watchee.path) + false // Not managed by the remote watcher, so not reliable to communication failure or remote system crash + } else { + // If watchee != this then watcher should == this. This is a reverse watch, and it is not intercepted + // If watchee == this, only the watches from remoteWatcher are sent on the wire, on behalf of other watchers + watcher != provider.remoteWatcher && watchee == this + } + def sendSystemMessage(message: SystemMessage): Unit = try { - remote.send(message, None, this) - provider.afterSendSystemMessage(message) + //send to remote, unless watch message is intercepted by the remoteWatcher + message match { + case Watch(watchee, watcher) if isWatchIntercepted(watchee, watcher) ⇒ + provider.remoteWatcher ! RemoteWatcher.WatchRemote(watchee, watcher) + //Unwatch has a different signature, need to pattern match arguments against InternalActorRef + case Unwatch(watchee: InternalActorRef, watcher: InternalActorRef) if isWatchIntercepted(watchee, watcher) ⇒ + provider.remoteWatcher ! RemoteWatcher.UnwatchRemote(watchee, watcher) + case _ ⇒ remote.send(message, None, this) + } } catch handleException override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala index 0e87fce22f..2f4d2d47f5 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala @@ -3,24 +3,15 @@ */ package akka.remote -import scala.concurrent.duration._ -import akka.actor.Actor -import akka.actor.ActorLogging -import akka.actor.ActorRef -import akka.actor.Address -import akka.actor.AddressTerminated -import akka.actor.Props -import akka.actor.RootActorPath -import akka.actor.Terminated -import akka.actor.ExtendedActorSystem import akka.ConfigurationException -import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } -import akka.actor.InternalActorRef -import akka.dispatch.sysmsg.DeathWatchNotification -import akka.dispatch.sysmsg.Watch -import akka.actor.Deploy +import akka.actor._ +import akka.dispatch.sysmsg.{ DeathWatchNotification, Watch } +import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.event.AddressTerminatedTopic +import scala.collection.mutable +import scala.concurrent.duration._ + /** * INTERNAL API */ @@ -37,11 +28,8 @@ private[akka] object RemoteWatcher { Props(classOf[RemoteWatcher], failureDetector, heartbeatInterval, unreachableReaperInterval, heartbeatExpectedResponseAfter).withDeploy(Deploy.local) - final case class WatchRemote(watchee: ActorRef, watcher: ActorRef) - final case class UnwatchRemote(watchee: ActorRef, watcher: ActorRef) - final case class RewatchRemote(watchee: ActorRef, watcher: ActorRef) - @SerialVersionUID(1L) - class Rewatch(watchee: InternalActorRef, watcher: InternalActorRef) extends Watch(watchee, watcher) + final case class WatchRemote(watchee: InternalActorRef, watcher: InternalActorRef) + final case class UnwatchRemote(watchee: InternalActorRef, watcher: InternalActorRef) @SerialVersionUID(1L) case object Heartbeat extends PriorityMessage @SerialVersionUID(1L) final case class HeartbeatRsp(addressUid: Int) extends PriorityMessage @@ -54,16 +42,17 @@ private[akka] object RemoteWatcher { // test purpose object Stats { lazy val empty: Stats = counts(0, 0) - def counts(watching: Int, watchingNodes: Int): Stats = - new Stats(watching, watchingNodes)(Set.empty) + def counts(watching: Int, watchingNodes: Int): Stats = Stats(watching, watchingNodes)(Set.empty, Set.empty) } - final case class Stats(watching: Int, watchingNodes: Int)(val watchingRefs: Set[(ActorRef, ActorRef)]) { + final case class Stats(watching: Int, watchingNodes: Int)(val watchingRefs: Set[(ActorRef, ActorRef)], + val watchingAddresses: Set[Address]) { override def toString: String = { def formatWatchingRefs: String = - if (watchingRefs.isEmpty) "" - else ", watchingRefs=" + watchingRefs.map(x ⇒ x._2.path.name + " -> " + x._1.path.name).mkString("[", ", ", "]") + watchingRefs.map(x ⇒ x._2.path.name + " -> " + x._1.path.name).mkString("[", ", ", "]") + def formatWatchingAddresses: String = + watchingAddresses.mkString("[", ", ", "]") - s"Stats(watching=${watching}, watchingNodes=${watchingNodes}${formatWatchingRefs})" + s"Stats(watching=$watching, watchingNodes=$watchingNodes, watchingRefs=$formatWatchingRefs, watchingAddresses=$formatWatchingAddresses)" } } } @@ -107,10 +96,13 @@ private[akka] class RemoteWatcher( val selfHeartbeatRspMsg = HeartbeatRsp(AddressUidExtension(context.system).addressUid) - // actors that this node is watching, tuple with (watcher, watchee) - var watching: Set[(ActorRef, ActorRef)] = Set.empty - // nodes that this node is watching, i.e. expecting hearteats from these nodes - var watchingNodes: Set[Address] = Set.empty + // actors that this node is watching, map of watchee -> Set(watchers) + val watching = new mutable.HashMap[InternalActorRef, mutable.Set[InternalActorRef]]() with mutable.MultiMap[InternalActorRef, InternalActorRef] + + // nodes that this node is watching, i.e. expecting heartbeats from these nodes. Map of address -> Set(watchee) on this address + val watcheeByNodes = new mutable.HashMap[Address, mutable.Set[InternalActorRef]]() with mutable.MultiMap[Address, InternalActorRef] + def watchingNodes = watcheeByNodes.keySet + var unreachable: Set[Address] = Set.empty var addressUids: Map[Address, Int] = Map.empty @@ -125,21 +117,21 @@ private[akka] class RemoteWatcher( } def receive = { - case HeartbeatTick ⇒ sendHeartbeat() - case Heartbeat ⇒ receiveHeartbeat() - case HeartbeatRsp(uid) ⇒ receiveHeartbeatRsp(uid) - case ReapUnreachableTick ⇒ reapUnreachable() - case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from) - case WatchRemote(watchee, watcher) ⇒ watchRemote(watchee, watcher) - case UnwatchRemote(watchee, watcher) ⇒ unwatchRemote(watchee, watcher) - case t @ Terminated(watchee) ⇒ terminated(watchee, t.existenceConfirmed, t.addressTerminated) - case RewatchRemote(watchee, watcher) ⇒ rewatchRemote(watchee, watcher) + case HeartbeatTick ⇒ sendHeartbeat() + case Heartbeat ⇒ receiveHeartbeat() + case HeartbeatRsp(uid) ⇒ receiveHeartbeatRsp(uid) + case ReapUnreachableTick ⇒ reapUnreachable() + case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from) + case WatchRemote(watchee, watcher) ⇒ addWatch(watchee, watcher) + case UnwatchRemote(watchee, watcher) ⇒ removeWatch(watchee, watcher) + case t @ Terminated(watchee: InternalActorRef) ⇒ terminated(watchee, t.existenceConfirmed, t.addressTerminated) // test purpose case Stats ⇒ + val watchSet = watching.iterator.flatMap { case (wee, wers) ⇒ wers.map { wer ⇒ wee → wer } }.toSet[(ActorRef, ActorRef)] sender() ! Stats( - watching = watching.size, - watchingNodes = watchingNodes.size)(watching) + watching = watchSet.size, + watchingNodes = watchingNodes.size)(watchSet, watchingNodes.toSet) } def receiveHeartbeat(): Unit = @@ -153,10 +145,10 @@ private[akka] class RemoteWatcher( else log.debug("Received first heartbeat rsp from [{}]", from) - if (watchingNodes(from) && !unreachable(from)) { + if (watcheeByNodes.contains(from) && !unreachable(from)) { if (!addressUids.contains(from) || addressUids(from) != uid) reWatch(from) - addressUids += (from -> uid) + addressUids += (from → uid) failureDetector.heartbeat(from) } } @@ -177,85 +169,79 @@ private[akka] class RemoteWatcher( def quarantine(address: Address, uid: Option[Int]): Unit = remoteProvider.quarantine(address, uid) - def rewatchRemote(watchee: ActorRef, watcher: ActorRef): Unit = - if (watching.contains((watchee, watcher))) - watchRemote(watchee, watcher) - else - //has been unwatched inbetween, skip re-watch - log.debug("Ignoring re-watch after being unwatched in the meantime: [{} -> {}]", watcher.path, watchee.path) + def addWatch(watchee: InternalActorRef, watcher: InternalActorRef): Unit = { + assert(watcher != self) + log.debug("Watching: [{} -> {}]", watcher.path, watchee.path) + watching.addBinding(watchee, watcher) + watchNode(watchee) - def watchRemote(watchee: ActorRef, watcher: ActorRef): Unit = - if (watchee.path.uid == akka.actor.ActorCell.undefinedUid) - logActorForDeprecationWarning(watchee) - else if (watcher != self) { - log.debug("Watching: [{} -> {}]", watcher.path, watchee.path) - addWatching(watchee, watcher) + // add watch from self, this will actually send a Watch to the target when necessary + context watch watchee + } - // also watch from self, to be able to cleanup on termination of the watchee - context watch watchee - watching += ((watchee, self)) - } - - def addWatching(watchee: ActorRef, watcher: ActorRef): Unit = { - watching += ((watchee, watcher)) + def watchNode(watchee: InternalActorRef): Unit = { val watcheeAddress = watchee.path.address - if (!watchingNodes(watcheeAddress) && unreachable(watcheeAddress)) { + if (!watcheeByNodes.contains(watcheeAddress) && unreachable(watcheeAddress)) { // first watch to that node after a previous unreachable unreachable -= watcheeAddress failureDetector.remove(watcheeAddress) } - watchingNodes += watcheeAddress + watcheeByNodes.addBinding(watcheeAddress, watchee) } - def unwatchRemote(watchee: ActorRef, watcher: ActorRef): Unit = - if (watchee.path.uid == akka.actor.ActorCell.undefinedUid) - logActorForDeprecationWarning(watchee) - else if (watcher != self) { - log.debug("Unwatching: [{} -> {}]", watcher.path, watchee.path) - watching -= ((watchee, watcher)) + def removeWatch(watchee: InternalActorRef, watcher: InternalActorRef): Unit = { + assert(watcher != self) + log.debug("Unwatching: [{} -> {}]", watcher.path, watchee.path) - // clean up self watch when no more watchers of this watchee - if (watching.forall { case (wee, wer) ⇒ wee != watchee || wer == self }) { - log.debug("Cleanup self watch of [{}]", watchee.path) - context unwatch watchee - watching -= ((watchee, self)) - } - checkLastUnwatchOfNode(watchee.path.address) + // Could have used removeBinding, but it does not tell if this was the last entry. This saves a contains call. + watching.get(watchee) match { + case Some(watchers) ⇒ + watchers -= watcher + if (watchers.isEmpty) { + // clean up self watch when no more watchers of this watchee + log.debug("Cleanup self watch of [{}]", watchee.path) + context unwatch watchee + removeWatchee(watchee) + } + case None ⇒ } - - def logActorForDeprecationWarning(watchee: ActorRef): Unit = { - log.debug("actorFor is deprecated, and watching a remote ActorRef acquired with actorFor is not reliable: [{}]", watchee.path) } - def terminated(watchee: ActorRef, existenceConfirmed: Boolean, addressTerminated: Boolean): Unit = { + def removeWatchee(watchee: InternalActorRef): Unit = { + val watcheeAddress = watchee.path.address + watching -= watchee + // Could have used removeBinding, but it does not tell if this was the last entry. This saves a contains call. + watcheeByNodes.get(watcheeAddress) match { + case Some(watchees) ⇒ + watchees -= watchee + if (watchees.isEmpty) { + // unwatched last watchee on that node + log.debug("Unwatched last watchee of node: [{}]", watcheeAddress) + unwatchNode(watcheeAddress) + } + case None ⇒ + } + } + + def unwatchNode(watcheeAddress: Address): Unit = { + watcheeByNodes -= watcheeAddress + addressUids -= watcheeAddress + failureDetector.remove(watcheeAddress) + } + + def terminated(watchee: InternalActorRef, existenceConfirmed: Boolean, addressTerminated: Boolean): Unit = { log.debug("Watchee terminated: [{}]", watchee.path) - // When watchee is stopped it sends DeathWatchNotification to the watcher and to this RemoteWatcher, - // which is also watching. Send extra DeathWatchNotification to the watcher in case the - // DeathWatchNotification message is only delivered to RemoteWatcher. Otherwise there is a risk that - // the monitoring is removed, subsequent node failure is not detected and the original watcher is - // never notified. This may occur for normal system shutdown of the watchee system when not all remote - // messages are flushed at shutdown. - watching --= watching collect { - case tuple @ (wee, wer: InternalActorRef) if wee == watchee ⇒ - if (!addressTerminated && wer != self) - wer.sendSystemMessage(DeathWatchNotification(watchee, existenceConfirmed, addressTerminated)) - tuple - } + // When watchee is stopped it sends DeathWatchNotification to this RemoteWatcher, + // which will propagate it to all watchers of this watchee. + // addressTerminated case is already handled by the watcher itself in DeathWatch trait + if (!addressTerminated) + for { + watchers ← watching.get(watchee) + watcher ← watchers + } watcher.sendSystemMessage(DeathWatchNotification(watchee, existenceConfirmed, addressTerminated)) - checkLastUnwatchOfNode(watchee.path.address) - } - - def checkLastUnwatchOfNode(watcheeAddress: Address): Unit = { - if (watchingNodes(watcheeAddress) && watching.forall { - case (wee, wer) ⇒ wee.path.address != watcheeAddress - }) { - // unwatched last watchee on that node - log.debug("Unwatched last watchee of node: [{}]", watcheeAddress) - watchingNodes -= watcheeAddress - addressUids -= watcheeAddress - failureDetector.remove(watcheeAddress) - } + removeWatchee(watchee) } def sendHeartbeat(): Unit = @@ -274,7 +260,7 @@ private[akka] class RemoteWatcher( } def triggerFirstHeartbeat(address: Address): Unit = - if (watchingNodes(address) && !failureDetector.isMonitoring(address)) { + if (watcheeByNodes.contains(address) && !failureDetector.isMonitoring(address)) { log.debug("Trigger extra expected heartbeat from [{}]", address) failureDetector.heartbeat(address) } @@ -287,15 +273,12 @@ private[akka] class RemoteWatcher( * does not exist. */ def reWatch(address: Address): Unit = - watching.foreach { - case (wee: InternalActorRef, wer: InternalActorRef) ⇒ - if (wee.path.address == address) { - // this re-watch will result in a RewatchRemote message to this actor - // must be a special message to be able to detect if an UnwatchRemote comes in - // before the extra RewatchRemote, then the re-watch should be ignored - log.debug("Re-watch [{} -> {}]", wer, wee) - wee.sendSystemMessage(new Rewatch(wee, wer)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - } + for { + watchees ← watcheeByNodes.get(address) + watchee ← watchees + } { + val watcher = self.asInstanceOf[InternalActorRef] + log.debug("Re-watch [{} -> {}]", watcher.path, watchee.path) + watchee.sendSystemMessage(Watch(watchee, watcher)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ } - } \ No newline at end of file diff --git a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala index 0cfc15d294..be68af3a1a 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala @@ -6,16 +6,7 @@ package akka.remote import language.postfixOps import scala.concurrent.duration._ import akka.testkit._ -import akka.actor.ActorSystem -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.Props -import akka.actor.ExtendedActorSystem -import akka.actor.RootActorPath -import akka.actor.Identify -import akka.actor.ActorIdentity -import akka.actor.PoisonPill -import akka.actor.Address +import akka.actor._ object RemoteWatcherSpec { @@ -101,10 +92,10 @@ class RemoteWatcherSpec extends AkkaSpec( val heartbeatRspB = HeartbeatRsp(remoteAddressUid) - def createRemoteActor(props: Props, name: String): ActorRef = { + def createRemoteActor(props: Props, name: String): InternalActorRef = { remoteSystem.actorOf(props, name) system.actorSelection(RootActorPath(remoteAddress) / "user" / name) ! Identify(name) - expectMsgType[ActorIdentity].ref.get + expectMsgType[ActorIdentity].ref.get.asInstanceOf[InternalActorRef] } "A RemoteWatcher" must { @@ -115,8 +106,8 @@ class RemoteWatcherSpec extends AkkaSpec( val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor1") val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor1") - val a1 = system.actorOf(Props[MyActor], "a1") - val a2 = system.actorOf(Props[MyActor], "a2") + val a1 = system.actorOf(Props[MyActor], "a1").asInstanceOf[InternalActorRef] + val a2 = system.actorOf(Props[MyActor], "a2").asInstanceOf[InternalActorRef] val b1 = createRemoteActor(Props[MyActor], "b1") val b2 = createRemoteActor(Props[MyActor], "b2") @@ -124,9 +115,8 @@ class RemoteWatcherSpec extends AkkaSpec( monitorA ! WatchRemote(b2, a1) monitorA ! WatchRemote(b2, a2) monitorA ! Stats - // for each watchee the RemoteWatcher also adds its own watch: 5 = 3 + 2 // (a1->b1), (a1->b2), (a2->b2) - expectMsg(Stats.counts(watching = 5, watchingNodes = 1)) + expectMsg(Stats.counts(watching = 3, watchingNodes = 1)) expectNoMsg(100 millis) monitorA ! HeartbeatTick expectMsg(Heartbeat) @@ -142,7 +132,7 @@ class RemoteWatcherSpec extends AkkaSpec( monitorA ! UnwatchRemote(b1, a1) // still (a1->b2) and (a2->b2) left monitorA ! Stats - expectMsg(Stats.counts(watching = 3, watchingNodes = 1)) + expectMsg(Stats.counts(watching = 2, watchingNodes = 1)) expectNoMsg(100 millis) monitorA ! HeartbeatTick expectMsg(Heartbeat) @@ -151,7 +141,7 @@ class RemoteWatcherSpec extends AkkaSpec( monitorA ! UnwatchRemote(b2, a2) // still (a1->b2) left monitorA ! Stats - expectMsg(Stats.counts(watching = 2, watchingNodes = 1)) + expectMsg(Stats.counts(watching = 1, watchingNodes = 1)) expectNoMsg(100 millis) monitorA ! HeartbeatTick expectMsg(Heartbeat) @@ -180,7 +170,7 @@ class RemoteWatcherSpec extends AkkaSpec( val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor4") val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor4") - val a = system.actorOf(Props[MyActor], "a4") + val a = system.actorOf(Props[MyActor], "a4").asInstanceOf[InternalActorRef] val b = createRemoteActor(Props[MyActor], "b4") monitorA ! WatchRemote(b, a) @@ -219,7 +209,7 @@ class RemoteWatcherSpec extends AkkaSpec( val monitorA = system.actorOf(Props(classOf[TestRemoteWatcher], heartbeatExpectedResponseAfter), "monitor5") val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor5") - val a = system.actorOf(Props[MyActor], "a5") + val a = system.actorOf(Props[MyActor], "a5").asInstanceOf[InternalActorRef] val b = createRemoteActor(Props[MyActor], "b5") monitorA ! WatchRemote(b, a) @@ -253,7 +243,7 @@ class RemoteWatcherSpec extends AkkaSpec( val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor6") val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor6") - val a = system.actorOf(Props[MyActor], "a6") + val a = system.actorOf(Props[MyActor], "a6").asInstanceOf[InternalActorRef] val b = createRemoteActor(Props[MyActor], "b6") monitorA ! WatchRemote(b, a) diff --git a/project/MiMa.scala b/project/MiMa.scala index 365af980eb..b680670009 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -3,11 +3,9 @@ */ package akka -import sbt._ -import sbt.Keys._ -import com.typesafe.tools.mima.plugin.MimaKeys.binaryIssueFilters -import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact +import com.typesafe.tools.mima.plugin.MimaKeys.{binaryIssueFilters, previousArtifact} import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings +import sbt._ object MiMa extends AutoPlugin { @@ -407,7 +405,8 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[FinalClassProblem]("akka.cluster.routing.ClusterRouterPoolSettings"), ProblemFilters.exclude[FinalClassProblem]("akka.cluster.routing.MixMetricsSelector"), ProblemFilters.exclude[FinalClassProblem]("akka.cluster.routing.ClusterRouterGroupSettings"), - + ProblemFilters.exclude[FinalClassProblem]("akka.dispatch.sysmsg.Watch"), + // changed to static method, source compatible is enough ProblemFilters.exclude[MissingMethodProblem]("akka.testkit.JavaTestKit.shutdownActorSystem"), // testActorName()java.lang.String in trait akka.testkit.TestKitBase does not have a correspondent in old version @@ -529,8 +528,17 @@ object MiMa extends AutoPlugin { // synthetic method akka$dispatch$BatchingExecutor$$_blockContext()java.lang.ThreadLocal in class akka.dispatch.MessageDispatcher does not have a correspondent in new version ProblemFilters.exclude[MissingMethodProblem]("akka.dispatch.MessageDispatcher.akka$dispatch$BatchingExecutor$$_blockContext"), // issue #16736 - ProblemFilters.exclude[MissingClassProblem]("akka.cluster.OnMemberUpListener") + ProblemFilters.exclude[MissingClassProblem]("akka.cluster.OnMemberUpListener"), + //changes introduced by #16911 + ProblemFilters.exclude[MissingMethodProblem]("akka.remote.RemoteActorRefProvider.afterSendSystemMessage"), + FilterAnyProblem("akka.remote.RemoteWatcher"), + FilterAnyProblem("akka.remote.RemoteWatcher$WatchRemote"), + FilterAnyProblem("akka.remote.RemoteWatcher$UnwatchRemote"), + FilterAnyProblem("akka.remote.RemoteWatcher$Rewatch"), + FilterAnyProblem("akka.remote.RemoteWatcher$RewatchRemote"), + FilterAnyProblem("akka.remote.RemoteWatcher$Stats") + ) } }