diff --git a/akka-actor/src/main/scala/akka/dispatch/sysmsg/SystemMessage.scala b/akka-actor/src/main/scala/akka/dispatch/sysmsg/SystemMessage.scala index 00032a10a1..0dff418daa 100644 --- a/akka-actor/src/main/scala/akka/dispatch/sysmsg/SystemMessage.scala +++ b/akka-actor/src/main/scala/akka/dispatch/sysmsg/SystemMessage.scala @@ -239,11 +239,11 @@ private[akka] final case class Supervise(child: ActorRef, async: Boolean) extend * INTERNAL API */ @SerialVersionUID(1L) -private[akka] case class Watch(watchee: InternalActorRef, watcher: InternalActorRef) extends SystemMessage // sent to establish a DeathWatch +private[akka] final case class Watch(watchee: InternalActorRef, watcher: InternalActorRef) extends SystemMessage // sent to establish a DeathWatch /** * INTERNAL API */ -@SerialVersionUID(1L) +@SerialVersionUID(1L) // Watch and Unwatch have different signatures, but this can't be changed without breaking serialization compatibility private[akka] final case class Unwatch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to tear down a DeathWatch /** * INTERNAL API diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala index 33686f4d28..8119bde413 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala @@ -4,16 +4,13 @@ package akka.cluster import scala.concurrent.duration.FiniteDuration -import akka.actor.Actor -import akka.actor.Address -import akka.actor.Props +import akka.actor._ import akka.cluster.ClusterEvent.CurrentClusterState import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.ClusterEvent.MemberUp import akka.cluster.ClusterEvent.MemberRemoved import akka.remote.FailureDetectorRegistry import akka.remote.RemoteWatcher -import akka.actor.Deploy /** * INTERNAL API @@ -36,7 +33,7 @@ private[cluster] object ClusterRemoteWatcher { * * Specialization of [[akka.remote.RemoteWatcher]] that keeps * track of cluster member nodes and is responsible for watchees on cluster nodes. - * [[akka.actor.AddressTerminate]] is published when node is removed from cluster. + * [[akka.actor.AddressTerminated]] is published when node is removed from cluster. * * `RemoteWatcher` handles non-cluster nodes. `ClusterRemoteWatcher` will take * over responsibility from `RemoteWatcher` if a watch is added before a node is member @@ -53,8 +50,6 @@ private[cluster] class ClusterRemoteWatcher( unreachableReaperInterval, heartbeatExpectedResponseAfter) { - import RemoteWatcher._ - val cluster = Cluster(context.system) import cluster.selfAddress @@ -73,8 +68,6 @@ private[cluster] class ClusterRemoteWatcher( override def receive = receiveClusterEvent orElse super.receive def receiveClusterEvent: Actor.Receive = { - case WatchRemote(watchee, watcher) if clusterNodes(watchee.path.address) ⇒ - () // cluster managed node, don't propagate to super case state: CurrentClusterState ⇒ clusterNodes = state.members.collect { case m if m.address != selfAddress ⇒ m.address } clusterNodes foreach takeOverResponsibility @@ -96,16 +89,18 @@ private[cluster] class ClusterRemoteWatcher( case _: MemberEvent ⇒ // not interesting } + override def watchNode(watchee: InternalActorRef) = + if (!clusterNodes(watchee.path.address)) super.watchNode(watchee) + /** * When a cluster node is added this class takes over the * responsibility for watchees on that node already handled * by super RemoteWatcher. */ - def takeOverResponsibility(address: Address): Unit = { - watching foreach { - case (watchee, watcher) ⇒ if (watchee.path.address == address) - unwatchRemote(watchee, watcher) + def takeOverResponsibility(address: Address): Unit = + if (watchingNodes(address)) { + log.debug("Cluster is taking over responsibility of node: [{}]", address) + unwatchNode(address) } - } } \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala index 17bf58b8b6..88a77b1237 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala @@ -6,28 +6,15 @@ package akka.cluster import language.postfixOps import scala.concurrent.Await import scala.concurrent.duration._ -import com.typesafe.config.ConfigFactory -import org.scalatest.BeforeAndAfter import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.testkit.TestEvent._ -import akka.actor.Props -import akka.actor.Actor -import akka.actor.Address -import akka.actor.RootActorPath -import akka.actor.Terminated -import akka.actor.Address +import akka.actor._ import akka.remote.RemoteActorRef import java.util.concurrent.TimeoutException -import akka.actor.ActorSystemImpl -import akka.actor.ActorIdentity -import akka.actor.Identify -import akka.actor.ActorRef import akka.remote.RemoteWatcher -import akka.actor.ActorSystem import akka.cluster.MultiNodeClusterSpec.EndActor -import akka.actor.Deploy object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -172,7 +159,9 @@ abstract class ClusterDeathWatchSpec // fifth is not cluster member, so the watch is handled by the RemoteWatcher awaitAssert { remoteWatcher ! RemoteWatcher.Stats - expectMsgType[RemoteWatcher.Stats].watchingRefs should contain((subject5, testActor)) + val stats = expectMsgType[RemoteWatcher.Stats] + stats.watchingRefs should contain(subject5 → testActor) + stats.watchingAddresses should contain(address(fifth)) } } enterBarrier("remote-watch") @@ -181,13 +170,13 @@ abstract class ClusterDeathWatchSpec awaitClusterUp(first, fourth, fifth) runOn(first) { - // fifth is member, so the watch is handled by the ClusterRemoteWatcher, - // and cleaned up from RemoteWatcher + // fifth is member, so the node is handled by the ClusterRemoteWatcher, + // but the watch is still in RemoteWatcher awaitAssert { remoteWatcher ! RemoteWatcher.Stats - expectMsgType[RemoteWatcher.Stats].watchingRefs.map { - case (watchee, watcher) ⇒ watchee.path.name - } should not contain ("subject5") + val stats = expectMsgType[RemoteWatcher.Stats] + stats.watchingRefs.map(_._1.path.name) should contain("subject5") + stats.watchingAddresses should not contain address(fifth) } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index f162da91ea..189d0b1310 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -8,14 +8,12 @@ import akka.actor._ import akka.dispatch.sysmsg._ import akka.event.{ Logging, LoggingAdapter, EventStream } import akka.event.Logging.Error -import akka.serialization.{ JavaSerializer, Serialization, SerializationExtension } +import akka.serialization.{ Serialization, SerializationExtension } import akka.pattern.pipe import scala.util.control.NonFatal import akka.actor.SystemGuardian.{ TerminationHookDone, TerminationHook, RegisterTerminationHook } import scala.util.control.Exception.Catcher -import scala.concurrent.{ ExecutionContext, Future } -import scala.concurrent.forkjoin.ThreadLocalRandom -import com.typesafe.config.Config +import scala.concurrent.Future import akka.ConfigurationException import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } @@ -150,7 +148,9 @@ private[akka] class RemoteActorRefProvider( // This actor ensures the ordering of shutdown between remoteDaemon and the transport @volatile private var remotingTerminator: ActorRef = _ - @volatile private var remoteWatcher: ActorRef = _ + @volatile private var _remoteWatcher: ActorRef = _ + private[akka] def remoteWatcher = _remoteWatcher + @volatile private var remoteDeploymentWatcher: ActorRef = _ def init(system: ActorSystemImpl): Unit = { @@ -183,7 +183,7 @@ private[akka] class RemoteActorRefProvider( // this enables reception of remote requests transport.start() - remoteWatcher = createRemoteWatcher(system) + _remoteWatcher = createRemoteWatcher(system) remoteDeploymentWatcher = createRemoteDeploymentWatcher(system) } @@ -422,20 +422,6 @@ private[akka] class RemoteActorRefProvider( */ def quarantine(address: Address, uid: Option[Int]): Unit = transport.quarantine(address, uid) - /** - * INTERNAL API - */ - private[akka] def afterSendSystemMessage(message: SystemMessage): Unit = - message match { - // Sending to local remoteWatcher relies strong delivery guarantees of local send, i.e. - // default dispatcher must not be changed to an implementation that defeats that - case rew: RemoteWatcher.Rewatch ⇒ - remoteWatcher ! RemoteWatcher.RewatchRemote(rew.watchee, rew.watcher) - case Watch(watchee, watcher) ⇒ remoteWatcher ! RemoteWatcher.WatchRemote(watchee, watcher) - case Unwatch(watchee, watcher) ⇒ remoteWatcher ! RemoteWatcher.UnwatchRemote(watchee, watcher) - case _ ⇒ - } - } private[akka] trait RemoteRef extends ActorRefScope { @@ -475,10 +461,30 @@ private[akka] class RemoteActorRef private[akka] ( remote.system.eventStream.publish(Error(e, path.toString, getClass, "swallowing exception during message send")) } + /** + * Determine if a watch/unwatch message must be handled by the remoteWatcher actor, or sent to this remote ref + */ + def isWatchIntercepted(watchee: ActorRef, watcher: ActorRef) = + if (watchee.path.uid == akka.actor.ActorCell.undefinedUid) { + provider.log.debug("actorFor is deprecated, and watching a remote ActorRef acquired with actorFor is not reliable: [{}]", watchee.path) + false // Not managed by the remote watcher, so not reliable to communication failure or remote system crash + } else { + // If watchee != this then watcher should == this. This is a reverse watch, and it is not intercepted + // If watchee == this, only the watches from remoteWatcher are sent on the wire, on behalf of other watchers + watcher != provider.remoteWatcher && watchee == this + } + def sendSystemMessage(message: SystemMessage): Unit = try { - remote.send(message, None, this) - provider.afterSendSystemMessage(message) + //send to remote, unless watch message is intercepted by the remoteWatcher + message match { + case Watch(watchee, watcher) if isWatchIntercepted(watchee, watcher) ⇒ + provider.remoteWatcher ! RemoteWatcher.WatchRemote(watchee, watcher) + //Unwatch has a different signature, need to pattern match arguments against InternalActorRef + case Unwatch(watchee: InternalActorRef, watcher: InternalActorRef) if isWatchIntercepted(watchee, watcher) ⇒ + provider.remoteWatcher ! RemoteWatcher.UnwatchRemote(watchee, watcher) + case _ ⇒ remote.send(message, None, this) + } } catch handleException override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala index 0e87fce22f..2f4d2d47f5 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala @@ -3,24 +3,15 @@ */ package akka.remote -import scala.concurrent.duration._ -import akka.actor.Actor -import akka.actor.ActorLogging -import akka.actor.ActorRef -import akka.actor.Address -import akka.actor.AddressTerminated -import akka.actor.Props -import akka.actor.RootActorPath -import akka.actor.Terminated -import akka.actor.ExtendedActorSystem import akka.ConfigurationException -import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } -import akka.actor.InternalActorRef -import akka.dispatch.sysmsg.DeathWatchNotification -import akka.dispatch.sysmsg.Watch -import akka.actor.Deploy +import akka.actor._ +import akka.dispatch.sysmsg.{ DeathWatchNotification, Watch } +import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.event.AddressTerminatedTopic +import scala.collection.mutable +import scala.concurrent.duration._ + /** * INTERNAL API */ @@ -37,11 +28,8 @@ private[akka] object RemoteWatcher { Props(classOf[RemoteWatcher], failureDetector, heartbeatInterval, unreachableReaperInterval, heartbeatExpectedResponseAfter).withDeploy(Deploy.local) - final case class WatchRemote(watchee: ActorRef, watcher: ActorRef) - final case class UnwatchRemote(watchee: ActorRef, watcher: ActorRef) - final case class RewatchRemote(watchee: ActorRef, watcher: ActorRef) - @SerialVersionUID(1L) - class Rewatch(watchee: InternalActorRef, watcher: InternalActorRef) extends Watch(watchee, watcher) + final case class WatchRemote(watchee: InternalActorRef, watcher: InternalActorRef) + final case class UnwatchRemote(watchee: InternalActorRef, watcher: InternalActorRef) @SerialVersionUID(1L) case object Heartbeat extends PriorityMessage @SerialVersionUID(1L) final case class HeartbeatRsp(addressUid: Int) extends PriorityMessage @@ -54,16 +42,17 @@ private[akka] object RemoteWatcher { // test purpose object Stats { lazy val empty: Stats = counts(0, 0) - def counts(watching: Int, watchingNodes: Int): Stats = - new Stats(watching, watchingNodes)(Set.empty) + def counts(watching: Int, watchingNodes: Int): Stats = Stats(watching, watchingNodes)(Set.empty, Set.empty) } - final case class Stats(watching: Int, watchingNodes: Int)(val watchingRefs: Set[(ActorRef, ActorRef)]) { + final case class Stats(watching: Int, watchingNodes: Int)(val watchingRefs: Set[(ActorRef, ActorRef)], + val watchingAddresses: Set[Address]) { override def toString: String = { def formatWatchingRefs: String = - if (watchingRefs.isEmpty) "" - else ", watchingRefs=" + watchingRefs.map(x ⇒ x._2.path.name + " -> " + x._1.path.name).mkString("[", ", ", "]") + watchingRefs.map(x ⇒ x._2.path.name + " -> " + x._1.path.name).mkString("[", ", ", "]") + def formatWatchingAddresses: String = + watchingAddresses.mkString("[", ", ", "]") - s"Stats(watching=${watching}, watchingNodes=${watchingNodes}${formatWatchingRefs})" + s"Stats(watching=$watching, watchingNodes=$watchingNodes, watchingRefs=$formatWatchingRefs, watchingAddresses=$formatWatchingAddresses)" } } } @@ -107,10 +96,13 @@ private[akka] class RemoteWatcher( val selfHeartbeatRspMsg = HeartbeatRsp(AddressUidExtension(context.system).addressUid) - // actors that this node is watching, tuple with (watcher, watchee) - var watching: Set[(ActorRef, ActorRef)] = Set.empty - // nodes that this node is watching, i.e. expecting hearteats from these nodes - var watchingNodes: Set[Address] = Set.empty + // actors that this node is watching, map of watchee -> Set(watchers) + val watching = new mutable.HashMap[InternalActorRef, mutable.Set[InternalActorRef]]() with mutable.MultiMap[InternalActorRef, InternalActorRef] + + // nodes that this node is watching, i.e. expecting heartbeats from these nodes. Map of address -> Set(watchee) on this address + val watcheeByNodes = new mutable.HashMap[Address, mutable.Set[InternalActorRef]]() with mutable.MultiMap[Address, InternalActorRef] + def watchingNodes = watcheeByNodes.keySet + var unreachable: Set[Address] = Set.empty var addressUids: Map[Address, Int] = Map.empty @@ -125,21 +117,21 @@ private[akka] class RemoteWatcher( } def receive = { - case HeartbeatTick ⇒ sendHeartbeat() - case Heartbeat ⇒ receiveHeartbeat() - case HeartbeatRsp(uid) ⇒ receiveHeartbeatRsp(uid) - case ReapUnreachableTick ⇒ reapUnreachable() - case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from) - case WatchRemote(watchee, watcher) ⇒ watchRemote(watchee, watcher) - case UnwatchRemote(watchee, watcher) ⇒ unwatchRemote(watchee, watcher) - case t @ Terminated(watchee) ⇒ terminated(watchee, t.existenceConfirmed, t.addressTerminated) - case RewatchRemote(watchee, watcher) ⇒ rewatchRemote(watchee, watcher) + case HeartbeatTick ⇒ sendHeartbeat() + case Heartbeat ⇒ receiveHeartbeat() + case HeartbeatRsp(uid) ⇒ receiveHeartbeatRsp(uid) + case ReapUnreachableTick ⇒ reapUnreachable() + case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from) + case WatchRemote(watchee, watcher) ⇒ addWatch(watchee, watcher) + case UnwatchRemote(watchee, watcher) ⇒ removeWatch(watchee, watcher) + case t @ Terminated(watchee: InternalActorRef) ⇒ terminated(watchee, t.existenceConfirmed, t.addressTerminated) // test purpose case Stats ⇒ + val watchSet = watching.iterator.flatMap { case (wee, wers) ⇒ wers.map { wer ⇒ wee → wer } }.toSet[(ActorRef, ActorRef)] sender() ! Stats( - watching = watching.size, - watchingNodes = watchingNodes.size)(watching) + watching = watchSet.size, + watchingNodes = watchingNodes.size)(watchSet, watchingNodes.toSet) } def receiveHeartbeat(): Unit = @@ -153,10 +145,10 @@ private[akka] class RemoteWatcher( else log.debug("Received first heartbeat rsp from [{}]", from) - if (watchingNodes(from) && !unreachable(from)) { + if (watcheeByNodes.contains(from) && !unreachable(from)) { if (!addressUids.contains(from) || addressUids(from) != uid) reWatch(from) - addressUids += (from -> uid) + addressUids += (from → uid) failureDetector.heartbeat(from) } } @@ -177,85 +169,79 @@ private[akka] class RemoteWatcher( def quarantine(address: Address, uid: Option[Int]): Unit = remoteProvider.quarantine(address, uid) - def rewatchRemote(watchee: ActorRef, watcher: ActorRef): Unit = - if (watching.contains((watchee, watcher))) - watchRemote(watchee, watcher) - else - //has been unwatched inbetween, skip re-watch - log.debug("Ignoring re-watch after being unwatched in the meantime: [{} -> {}]", watcher.path, watchee.path) + def addWatch(watchee: InternalActorRef, watcher: InternalActorRef): Unit = { + assert(watcher != self) + log.debug("Watching: [{} -> {}]", watcher.path, watchee.path) + watching.addBinding(watchee, watcher) + watchNode(watchee) - def watchRemote(watchee: ActorRef, watcher: ActorRef): Unit = - if (watchee.path.uid == akka.actor.ActorCell.undefinedUid) - logActorForDeprecationWarning(watchee) - else if (watcher != self) { - log.debug("Watching: [{} -> {}]", watcher.path, watchee.path) - addWatching(watchee, watcher) + // add watch from self, this will actually send a Watch to the target when necessary + context watch watchee + } - // also watch from self, to be able to cleanup on termination of the watchee - context watch watchee - watching += ((watchee, self)) - } - - def addWatching(watchee: ActorRef, watcher: ActorRef): Unit = { - watching += ((watchee, watcher)) + def watchNode(watchee: InternalActorRef): Unit = { val watcheeAddress = watchee.path.address - if (!watchingNodes(watcheeAddress) && unreachable(watcheeAddress)) { + if (!watcheeByNodes.contains(watcheeAddress) && unreachable(watcheeAddress)) { // first watch to that node after a previous unreachable unreachable -= watcheeAddress failureDetector.remove(watcheeAddress) } - watchingNodes += watcheeAddress + watcheeByNodes.addBinding(watcheeAddress, watchee) } - def unwatchRemote(watchee: ActorRef, watcher: ActorRef): Unit = - if (watchee.path.uid == akka.actor.ActorCell.undefinedUid) - logActorForDeprecationWarning(watchee) - else if (watcher != self) { - log.debug("Unwatching: [{} -> {}]", watcher.path, watchee.path) - watching -= ((watchee, watcher)) + def removeWatch(watchee: InternalActorRef, watcher: InternalActorRef): Unit = { + assert(watcher != self) + log.debug("Unwatching: [{} -> {}]", watcher.path, watchee.path) - // clean up self watch when no more watchers of this watchee - if (watching.forall { case (wee, wer) ⇒ wee != watchee || wer == self }) { - log.debug("Cleanup self watch of [{}]", watchee.path) - context unwatch watchee - watching -= ((watchee, self)) - } - checkLastUnwatchOfNode(watchee.path.address) + // Could have used removeBinding, but it does not tell if this was the last entry. This saves a contains call. + watching.get(watchee) match { + case Some(watchers) ⇒ + watchers -= watcher + if (watchers.isEmpty) { + // clean up self watch when no more watchers of this watchee + log.debug("Cleanup self watch of [{}]", watchee.path) + context unwatch watchee + removeWatchee(watchee) + } + case None ⇒ } - - def logActorForDeprecationWarning(watchee: ActorRef): Unit = { - log.debug("actorFor is deprecated, and watching a remote ActorRef acquired with actorFor is not reliable: [{}]", watchee.path) } - def terminated(watchee: ActorRef, existenceConfirmed: Boolean, addressTerminated: Boolean): Unit = { + def removeWatchee(watchee: InternalActorRef): Unit = { + val watcheeAddress = watchee.path.address + watching -= watchee + // Could have used removeBinding, but it does not tell if this was the last entry. This saves a contains call. + watcheeByNodes.get(watcheeAddress) match { + case Some(watchees) ⇒ + watchees -= watchee + if (watchees.isEmpty) { + // unwatched last watchee on that node + log.debug("Unwatched last watchee of node: [{}]", watcheeAddress) + unwatchNode(watcheeAddress) + } + case None ⇒ + } + } + + def unwatchNode(watcheeAddress: Address): Unit = { + watcheeByNodes -= watcheeAddress + addressUids -= watcheeAddress + failureDetector.remove(watcheeAddress) + } + + def terminated(watchee: InternalActorRef, existenceConfirmed: Boolean, addressTerminated: Boolean): Unit = { log.debug("Watchee terminated: [{}]", watchee.path) - // When watchee is stopped it sends DeathWatchNotification to the watcher and to this RemoteWatcher, - // which is also watching. Send extra DeathWatchNotification to the watcher in case the - // DeathWatchNotification message is only delivered to RemoteWatcher. Otherwise there is a risk that - // the monitoring is removed, subsequent node failure is not detected and the original watcher is - // never notified. This may occur for normal system shutdown of the watchee system when not all remote - // messages are flushed at shutdown. - watching --= watching collect { - case tuple @ (wee, wer: InternalActorRef) if wee == watchee ⇒ - if (!addressTerminated && wer != self) - wer.sendSystemMessage(DeathWatchNotification(watchee, existenceConfirmed, addressTerminated)) - tuple - } + // When watchee is stopped it sends DeathWatchNotification to this RemoteWatcher, + // which will propagate it to all watchers of this watchee. + // addressTerminated case is already handled by the watcher itself in DeathWatch trait + if (!addressTerminated) + for { + watchers ← watching.get(watchee) + watcher ← watchers + } watcher.sendSystemMessage(DeathWatchNotification(watchee, existenceConfirmed, addressTerminated)) - checkLastUnwatchOfNode(watchee.path.address) - } - - def checkLastUnwatchOfNode(watcheeAddress: Address): Unit = { - if (watchingNodes(watcheeAddress) && watching.forall { - case (wee, wer) ⇒ wee.path.address != watcheeAddress - }) { - // unwatched last watchee on that node - log.debug("Unwatched last watchee of node: [{}]", watcheeAddress) - watchingNodes -= watcheeAddress - addressUids -= watcheeAddress - failureDetector.remove(watcheeAddress) - } + removeWatchee(watchee) } def sendHeartbeat(): Unit = @@ -274,7 +260,7 @@ private[akka] class RemoteWatcher( } def triggerFirstHeartbeat(address: Address): Unit = - if (watchingNodes(address) && !failureDetector.isMonitoring(address)) { + if (watcheeByNodes.contains(address) && !failureDetector.isMonitoring(address)) { log.debug("Trigger extra expected heartbeat from [{}]", address) failureDetector.heartbeat(address) } @@ -287,15 +273,12 @@ private[akka] class RemoteWatcher( * does not exist. */ def reWatch(address: Address): Unit = - watching.foreach { - case (wee: InternalActorRef, wer: InternalActorRef) ⇒ - if (wee.path.address == address) { - // this re-watch will result in a RewatchRemote message to this actor - // must be a special message to be able to detect if an UnwatchRemote comes in - // before the extra RewatchRemote, then the re-watch should be ignored - log.debug("Re-watch [{} -> {}]", wer, wee) - wee.sendSystemMessage(new Rewatch(wee, wer)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - } + for { + watchees ← watcheeByNodes.get(address) + watchee ← watchees + } { + val watcher = self.asInstanceOf[InternalActorRef] + log.debug("Re-watch [{} -> {}]", watcher.path, watchee.path) + watchee.sendSystemMessage(Watch(watchee, watcher)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ } - } \ No newline at end of file diff --git a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala index 0cfc15d294..be68af3a1a 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala @@ -6,16 +6,7 @@ package akka.remote import language.postfixOps import scala.concurrent.duration._ import akka.testkit._ -import akka.actor.ActorSystem -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.Props -import akka.actor.ExtendedActorSystem -import akka.actor.RootActorPath -import akka.actor.Identify -import akka.actor.ActorIdentity -import akka.actor.PoisonPill -import akka.actor.Address +import akka.actor._ object RemoteWatcherSpec { @@ -101,10 +92,10 @@ class RemoteWatcherSpec extends AkkaSpec( val heartbeatRspB = HeartbeatRsp(remoteAddressUid) - def createRemoteActor(props: Props, name: String): ActorRef = { + def createRemoteActor(props: Props, name: String): InternalActorRef = { remoteSystem.actorOf(props, name) system.actorSelection(RootActorPath(remoteAddress) / "user" / name) ! Identify(name) - expectMsgType[ActorIdentity].ref.get + expectMsgType[ActorIdentity].ref.get.asInstanceOf[InternalActorRef] } "A RemoteWatcher" must { @@ -115,8 +106,8 @@ class RemoteWatcherSpec extends AkkaSpec( val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor1") val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor1") - val a1 = system.actorOf(Props[MyActor], "a1") - val a2 = system.actorOf(Props[MyActor], "a2") + val a1 = system.actorOf(Props[MyActor], "a1").asInstanceOf[InternalActorRef] + val a2 = system.actorOf(Props[MyActor], "a2").asInstanceOf[InternalActorRef] val b1 = createRemoteActor(Props[MyActor], "b1") val b2 = createRemoteActor(Props[MyActor], "b2") @@ -124,9 +115,8 @@ class RemoteWatcherSpec extends AkkaSpec( monitorA ! WatchRemote(b2, a1) monitorA ! WatchRemote(b2, a2) monitorA ! Stats - // for each watchee the RemoteWatcher also adds its own watch: 5 = 3 + 2 // (a1->b1), (a1->b2), (a2->b2) - expectMsg(Stats.counts(watching = 5, watchingNodes = 1)) + expectMsg(Stats.counts(watching = 3, watchingNodes = 1)) expectNoMsg(100 millis) monitorA ! HeartbeatTick expectMsg(Heartbeat) @@ -142,7 +132,7 @@ class RemoteWatcherSpec extends AkkaSpec( monitorA ! UnwatchRemote(b1, a1) // still (a1->b2) and (a2->b2) left monitorA ! Stats - expectMsg(Stats.counts(watching = 3, watchingNodes = 1)) + expectMsg(Stats.counts(watching = 2, watchingNodes = 1)) expectNoMsg(100 millis) monitorA ! HeartbeatTick expectMsg(Heartbeat) @@ -151,7 +141,7 @@ class RemoteWatcherSpec extends AkkaSpec( monitorA ! UnwatchRemote(b2, a2) // still (a1->b2) left monitorA ! Stats - expectMsg(Stats.counts(watching = 2, watchingNodes = 1)) + expectMsg(Stats.counts(watching = 1, watchingNodes = 1)) expectNoMsg(100 millis) monitorA ! HeartbeatTick expectMsg(Heartbeat) @@ -180,7 +170,7 @@ class RemoteWatcherSpec extends AkkaSpec( val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor4") val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor4") - val a = system.actorOf(Props[MyActor], "a4") + val a = system.actorOf(Props[MyActor], "a4").asInstanceOf[InternalActorRef] val b = createRemoteActor(Props[MyActor], "b4") monitorA ! WatchRemote(b, a) @@ -219,7 +209,7 @@ class RemoteWatcherSpec extends AkkaSpec( val monitorA = system.actorOf(Props(classOf[TestRemoteWatcher], heartbeatExpectedResponseAfter), "monitor5") val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor5") - val a = system.actorOf(Props[MyActor], "a5") + val a = system.actorOf(Props[MyActor], "a5").asInstanceOf[InternalActorRef] val b = createRemoteActor(Props[MyActor], "b5") monitorA ! WatchRemote(b, a) @@ -253,7 +243,7 @@ class RemoteWatcherSpec extends AkkaSpec( val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor6") val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor6") - val a = system.actorOf(Props[MyActor], "a6") + val a = system.actorOf(Props[MyActor], "a6").asInstanceOf[InternalActorRef] val b = createRemoteActor(Props[MyActor], "b6") monitorA ! WatchRemote(b, a) diff --git a/project/MiMa.scala b/project/MiMa.scala index 365af980eb..b680670009 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -3,11 +3,9 @@ */ package akka -import sbt._ -import sbt.Keys._ -import com.typesafe.tools.mima.plugin.MimaKeys.binaryIssueFilters -import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact +import com.typesafe.tools.mima.plugin.MimaKeys.{binaryIssueFilters, previousArtifact} import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings +import sbt._ object MiMa extends AutoPlugin { @@ -407,7 +405,8 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[FinalClassProblem]("akka.cluster.routing.ClusterRouterPoolSettings"), ProblemFilters.exclude[FinalClassProblem]("akka.cluster.routing.MixMetricsSelector"), ProblemFilters.exclude[FinalClassProblem]("akka.cluster.routing.ClusterRouterGroupSettings"), - + ProblemFilters.exclude[FinalClassProblem]("akka.dispatch.sysmsg.Watch"), + // changed to static method, source compatible is enough ProblemFilters.exclude[MissingMethodProblem]("akka.testkit.JavaTestKit.shutdownActorSystem"), // testActorName()java.lang.String in trait akka.testkit.TestKitBase does not have a correspondent in old version @@ -529,8 +528,17 @@ object MiMa extends AutoPlugin { // synthetic method akka$dispatch$BatchingExecutor$$_blockContext()java.lang.ThreadLocal in class akka.dispatch.MessageDispatcher does not have a correspondent in new version ProblemFilters.exclude[MissingMethodProblem]("akka.dispatch.MessageDispatcher.akka$dispatch$BatchingExecutor$$_blockContext"), // issue #16736 - ProblemFilters.exclude[MissingClassProblem]("akka.cluster.OnMemberUpListener") + ProblemFilters.exclude[MissingClassProblem]("akka.cluster.OnMemberUpListener"), + //changes introduced by #16911 + ProblemFilters.exclude[MissingMethodProblem]("akka.remote.RemoteActorRefProvider.afterSendSystemMessage"), + FilterAnyProblem("akka.remote.RemoteWatcher"), + FilterAnyProblem("akka.remote.RemoteWatcher$WatchRemote"), + FilterAnyProblem("akka.remote.RemoteWatcher$UnwatchRemote"), + FilterAnyProblem("akka.remote.RemoteWatcher$Rewatch"), + FilterAnyProblem("akka.remote.RemoteWatcher$RewatchRemote"), + FilterAnyProblem("akka.remote.RemoteWatcher$Stats") + ) } }