=rem improve remote watching mechanism

This improves the remote watching mechanism as follows: Watch requests
are intercepted by the RemoteWatcher and not sent on the wire,
excepted watches from the remoteWatcher itself.

RemoteWatcher is then in charge of forwarding DeathWatchNotification
messages to the watchers.

This reduces the number of watch message to one per watchee, even if
there are several watcher on the same watchee (instead of n+1 before).

Reversed watch messages, and watch on ref with undefinedUid are excluded from
interception by the RemoteWatcher and so are handled as before this commit.

In addition, the following changes are made:
- Keep watchers in a map watchee -> watchers for more efficient retrieval
(in a scala Multimap)
- Keep watchees in a map address -> watchee for more efficient retrieval
(in a scala Multimap)
- Use of InternalActorRef more thoroughly to avoid casts
- Rewatch use a standard watch message, as the distinction is longer needed
This commit is contained in:
Thibaut Robert 2015-02-19 15:49:02 +01:00
parent 493666999c
commit 12cbf83927
7 changed files with 173 additions and 202 deletions

View file

@ -8,14 +8,12 @@ import akka.actor._
import akka.dispatch.sysmsg._
import akka.event.{ Logging, LoggingAdapter, EventStream }
import akka.event.Logging.Error
import akka.serialization.{ JavaSerializer, Serialization, SerializationExtension }
import akka.serialization.{ Serialization, SerializationExtension }
import akka.pattern.pipe
import scala.util.control.NonFatal
import akka.actor.SystemGuardian.{ TerminationHookDone, TerminationHook, RegisterTerminationHook }
import scala.util.control.Exception.Catcher
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.forkjoin.ThreadLocalRandom
import com.typesafe.config.Config
import scala.concurrent.Future
import akka.ConfigurationException
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
@ -150,7 +148,9 @@ private[akka] class RemoteActorRefProvider(
// This actor ensures the ordering of shutdown between remoteDaemon and the transport
@volatile private var remotingTerminator: ActorRef = _
@volatile private var remoteWatcher: ActorRef = _
@volatile private var _remoteWatcher: ActorRef = _
private[akka] def remoteWatcher = _remoteWatcher
@volatile private var remoteDeploymentWatcher: ActorRef = _
def init(system: ActorSystemImpl): Unit = {
@ -183,7 +183,7 @@ private[akka] class RemoteActorRefProvider(
// this enables reception of remote requests
transport.start()
remoteWatcher = createRemoteWatcher(system)
_remoteWatcher = createRemoteWatcher(system)
remoteDeploymentWatcher = createRemoteDeploymentWatcher(system)
}
@ -422,20 +422,6 @@ private[akka] class RemoteActorRefProvider(
*/
def quarantine(address: Address, uid: Option[Int]): Unit = transport.quarantine(address, uid)
/**
* INTERNAL API
*/
private[akka] def afterSendSystemMessage(message: SystemMessage): Unit =
message match {
// Sending to local remoteWatcher relies strong delivery guarantees of local send, i.e.
// default dispatcher must not be changed to an implementation that defeats that
case rew: RemoteWatcher.Rewatch
remoteWatcher ! RemoteWatcher.RewatchRemote(rew.watchee, rew.watcher)
case Watch(watchee, watcher) remoteWatcher ! RemoteWatcher.WatchRemote(watchee, watcher)
case Unwatch(watchee, watcher) remoteWatcher ! RemoteWatcher.UnwatchRemote(watchee, watcher)
case _
}
}
private[akka] trait RemoteRef extends ActorRefScope {
@ -475,10 +461,30 @@ private[akka] class RemoteActorRef private[akka] (
remote.system.eventStream.publish(Error(e, path.toString, getClass, "swallowing exception during message send"))
}
/**
* Determine if a watch/unwatch message must be handled by the remoteWatcher actor, or sent to this remote ref
*/
def isWatchIntercepted(watchee: ActorRef, watcher: ActorRef) =
if (watchee.path.uid == akka.actor.ActorCell.undefinedUid) {
provider.log.debug("actorFor is deprecated, and watching a remote ActorRef acquired with actorFor is not reliable: [{}]", watchee.path)
false // Not managed by the remote watcher, so not reliable to communication failure or remote system crash
} else {
// If watchee != this then watcher should == this. This is a reverse watch, and it is not intercepted
// If watchee == this, only the watches from remoteWatcher are sent on the wire, on behalf of other watchers
watcher != provider.remoteWatcher && watchee == this
}
def sendSystemMessage(message: SystemMessage): Unit =
try {
remote.send(message, None, this)
provider.afterSendSystemMessage(message)
//send to remote, unless watch message is intercepted by the remoteWatcher
message match {
case Watch(watchee, watcher) if isWatchIntercepted(watchee, watcher)
provider.remoteWatcher ! RemoteWatcher.WatchRemote(watchee, watcher)
//Unwatch has a different signature, need to pattern match arguments against InternalActorRef
case Unwatch(watchee: InternalActorRef, watcher: InternalActorRef) if isWatchIntercepted(watchee, watcher)
provider.remoteWatcher ! RemoteWatcher.UnwatchRemote(watchee, watcher)
case _ remote.send(message, None, this)
}
} catch handleException
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = {