make remote supervision and path continuation work
- add supervisor to remote USE message - make remoteDaemon a VirtualPathContainer like LocalActorRefProvider.tempContainer (i.e. synchonous with CHM-based child lookup), scrap remoteDaemonSupervisor and rename remoteDaemon to “/remote” to match the plans in the docs - comment out the remote deployment configuration section, to be done when Henrik is finished with RoutedActorRef work - for now only “remote.nodes = ["sys@host:port"]” is looked at, i.e. if at least one is present, the first one is used to determine where to deploy the currently created child (routers will do the scaling-out component) [rest is commented out] - multi-jvm tests not yet re-enabled (need to be adapted), but all other tests are GREEN (at least on my machine)
This commit is contained in:
parent
fac840adfc
commit
e5bd8b5f88
10 changed files with 391 additions and 270 deletions
|
|
@ -4,9 +4,8 @@
|
|||
|
||||
package akka.remote
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor._
|
||||
import akka.event.Logging
|
||||
import akka.event._
|
||||
import akka.actor.Status._
|
||||
import akka.util._
|
||||
import akka.util.duration._
|
||||
|
|
@ -17,13 +16,12 @@ import akka.remote.RemoteProtocol._
|
|||
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
|
||||
import java.net.InetSocketAddress
|
||||
import com.eaio.uuid.UUID
|
||||
import akka.serialization.{ JavaSerializer, Serialization, Serializer, Compression }
|
||||
import akka.serialization.{ JavaSerializer, Serialization, Serializer, Compression, SerializationExtension }
|
||||
import akka.dispatch.{ Terminate, Dispatchers, Future, PinnedDispatcher }
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||
import akka.serialization.SerializationExtension
|
||||
import akka.dispatch.SystemMessage
|
||||
import akka.event.LoggingAdapter
|
||||
import scala.annotation.tailrec
|
||||
|
||||
/**
|
||||
* Remote module - contains remote client and server config, remote server instance, remote daemon, remote dispatchers etc.
|
||||
|
|
@ -37,33 +35,15 @@ class Remote(val system: ActorSystemImpl, val nodename: String, val remoteSettin
|
|||
import system._
|
||||
import settings._
|
||||
|
||||
private[remote] val serialization = SerializationExtension(system)
|
||||
private[remote] val remoteAddress = {
|
||||
RemoteAddress(system.name, remoteSettings.serverSettings.Hostname, remoteSettings.serverSettings.Port)
|
||||
}
|
||||
val serialization = SerializationExtension(system)
|
||||
|
||||
val remoteAddress = RemoteAddress(system.name, remoteSettings.serverSettings.Hostname, remoteSettings.serverSettings.Port)
|
||||
|
||||
val failureDetector = new AccrualFailureDetector(remoteSettings.FailureDetectorThreshold, remoteSettings.FailureDetectorMaxSampleSize)
|
||||
|
||||
// val gossiper = new Gossiper(this)
|
||||
|
||||
val remoteDaemonServiceName = "akka-system-remote-daemon".intern
|
||||
|
||||
val computeGridDispatcher = dispatcherFactory.fromConfig("akka.remote.compute-grid-dispatcher")
|
||||
|
||||
// FIXME it is probably better to create another supervisor for handling the children created by handle_*, ticket #1408
|
||||
val remoteDaemonSupervisor =
|
||||
system.provider.actorOf(system,
|
||||
Props(OneForOneStrategy(List(classOf[Exception]), None, None)),
|
||||
system.provider.rootGuardian,
|
||||
"akka-system-remote-supervisor",
|
||||
systemService = true) // is infinite restart what we want?
|
||||
|
||||
val remoteDaemon =
|
||||
system.provider.actorOf(system,
|
||||
Props(new RemoteSystemDaemon(this)).withDispatcher(dispatcherFactory.newPinnedDispatcher(remoteDaemonServiceName)),
|
||||
remoteDaemonSupervisor,
|
||||
remoteDaemonServiceName,
|
||||
systemService = true)
|
||||
val remoteDaemon = new RemoteSystemDaemon(this, provider.rootPath / "remote", provider.rootGuardian, log)
|
||||
|
||||
val remoteClientLifeCycleHandler = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
|
|
@ -96,7 +76,7 @@ class Remote(val system: ActorSystemImpl, val nodename: String, val remoteSettin
|
|||
}
|
||||
}
|
||||
|
||||
log.info("Starting remote server on [{}] and starting remoteDaemon {}", remoteAddress, remoteDaemon)
|
||||
log.info("Starting remote server on [{}]", remoteAddress)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -106,75 +86,89 @@ class Remote(val system: ActorSystemImpl, val nodename: String, val remoteSettin
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class RemoteSystemDaemon(remote: Remote) extends Actor {
|
||||
class RemoteSystemDaemon(remote: Remote, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter)
|
||||
extends VirtualPathContainer(_path, _parent, _log) {
|
||||
|
||||
import remote._
|
||||
import remote.{ system ⇒ systemImpl }
|
||||
/**
|
||||
* Find the longest matching path which we know about and return that ref
|
||||
* (or ask that ref to continue searching if elements are left).
|
||||
*/
|
||||
override def getChild(names: Iterator[String]): InternalActorRef = {
|
||||
|
||||
override def preRestart(reason: Throwable, msg: Option[Any]) {
|
||||
log.debug("RemoteSystemDaemon failed due to [{}] - restarting...", reason)
|
||||
@tailrec
|
||||
def rec(s: String, n: Int): (InternalActorRef, Int) = {
|
||||
getChild(s) match {
|
||||
case null ⇒
|
||||
val last = s.lastIndexOf('/')
|
||||
if (last == -1) (Nobody, n)
|
||||
else rec(s.substring(0, last), n + 1)
|
||||
case ref ⇒ (ref, n)
|
||||
}
|
||||
}
|
||||
|
||||
val full = Vector() ++ names
|
||||
rec(full.mkString("/"), 0) match {
|
||||
case (Nobody, _) ⇒ Nobody
|
||||
case (ref, n) if n == 0 ⇒ ref
|
||||
case (ref, n) ⇒ ref.getChild(full.takeRight(n).iterator)
|
||||
}
|
||||
}
|
||||
|
||||
def receive: Actor.Receive = {
|
||||
override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match {
|
||||
case message: RemoteSystemDaemonMessageProtocol ⇒
|
||||
log.debug("Received command [\n{}] to RemoteSystemDaemon on [{}]", message.getMessageType, nodename)
|
||||
log.debug("Received command [\n{}] to RemoteSystemDaemon on [{}]", message.getMessageType, remote.nodename)
|
||||
|
||||
message.getMessageType match {
|
||||
case USE ⇒ handleUse(message)
|
||||
case RELEASE ⇒ handleRelease(message)
|
||||
case USE ⇒ handleUse(message)
|
||||
case RELEASE ⇒ handleRelease(message)
|
||||
// case STOP ⇒ cluster.shutdown()
|
||||
// case DISCONNECT ⇒ cluster.disconnect()
|
||||
// case RECONNECT ⇒ cluster.reconnect()
|
||||
// case RESIGN ⇒ cluster.resign()
|
||||
// case FAIL_OVER_CONNECTIONS ⇒ handleFailover(message)
|
||||
case GOSSIP ⇒ handleGossip(message)
|
||||
case FUNCTION_FUN0_UNIT ⇒ handle_fun0_unit(message)
|
||||
case FUNCTION_FUN0_ANY ⇒ handle_fun0_any(message)
|
||||
case FUNCTION_FUN1_ARG_UNIT ⇒ handle_fun1_arg_unit(message)
|
||||
case FUNCTION_FUN1_ARG_ANY ⇒ handle_fun1_arg_any(message)
|
||||
//TODO: should we not deal with unrecognized message types?
|
||||
case GOSSIP ⇒ handleGossip(message)
|
||||
// case FUNCTION_FUN0_UNIT ⇒ handle_fun0_unit(message)
|
||||
// case FUNCTION_FUN0_ANY ⇒ handle_fun0_any(message, sender)
|
||||
// case FUNCTION_FUN1_ARG_UNIT ⇒ handle_fun1_arg_unit(message)
|
||||
// case FUNCTION_FUN1_ARG_ANY ⇒ handle_fun1_arg_any(message, sender)
|
||||
case unknown ⇒ log.warning("Unknown message type {} received by {}", unknown, this)
|
||||
}
|
||||
|
||||
case unknown ⇒ log.warning("Unknown message to RemoteSystemDaemon [{}]", unknown)
|
||||
case Terminated(child) ⇒ removeChild(child.path.elements.drop(1).mkString("/"))
|
||||
|
||||
case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this)
|
||||
}
|
||||
|
||||
def handleUse(message: RemoteSystemDaemonMessageProtocol) {
|
||||
try {
|
||||
if (message.hasActorPath) {
|
||||
|
||||
val actorFactoryBytes =
|
||||
if (remoteSettings.ShouldCompressData) LZF.uncompress(message.getPayload.toByteArray) else message.getPayload.toByteArray
|
||||
if (!message.hasActorPath || !message.hasSupervisor) log.error("Ignoring incomplete USE command [{}]", message)
|
||||
else {
|
||||
|
||||
val actorFactory =
|
||||
serialization.deserialize(actorFactoryBytes, classOf[() ⇒ Actor], None) match {
|
||||
case Left(error) ⇒ throw error
|
||||
case Right(instance) ⇒ instance.asInstanceOf[() ⇒ Actor]
|
||||
}
|
||||
val actorFactoryBytes =
|
||||
if (remote.remoteSettings.ShouldCompressData) LZF.uncompress(message.getPayload.toByteArray)
|
||||
else message.getPayload.toByteArray
|
||||
|
||||
message.getActorPath match {
|
||||
case RemoteActorPath(`remoteAddress`, elems) if elems.size > 0 ⇒
|
||||
val name = elems.last
|
||||
systemImpl.provider.actorFor(systemImpl.lookupRoot, elems.dropRight(1)) match {
|
||||
case x if x eq system.deadLetters ⇒
|
||||
log.error("Parent actor does not exist, ignoring remote system daemon command [{}]", message)
|
||||
case parent ⇒
|
||||
systemImpl.provider.actorOf(systemImpl, Props(creator = actorFactory), parent, name)
|
||||
}
|
||||
case _ ⇒
|
||||
log.error("remote path does not match path from message [{}]", message)
|
||||
val actorFactory =
|
||||
remote.serialization.deserialize(actorFactoryBytes, classOf[() ⇒ Actor], None) match {
|
||||
case Left(error) ⇒ throw error
|
||||
case Right(instance) ⇒ instance.asInstanceOf[() ⇒ Actor]
|
||||
}
|
||||
|
||||
} else {
|
||||
log.error("Actor 'address' for actor to instantiate is not defined, ignoring remote system daemon command [{}]", message)
|
||||
import remote.remoteAddress
|
||||
|
||||
message.getActorPath match {
|
||||
case RemoteActorPath(`remoteAddress`, elems) if elems.size > 0 && elems.head == "remote" ⇒
|
||||
// TODO RK canonicalize path so as not to duplicate it always
|
||||
val subpath = elems.drop(1)
|
||||
val path = remote.remoteDaemon.path / subpath
|
||||
val supervisor = remote.system.actorFor(message.getSupervisor).asInstanceOf[InternalActorRef]
|
||||
val actor = new LocalActorRef(remote.system, Props(creator = actorFactory), supervisor, path, true)
|
||||
addChild(subpath.mkString("/"), actor)
|
||||
remote.system.deathWatch.subscribe(this, actor)
|
||||
case _ ⇒
|
||||
log.error("remote path does not match path from message [{}]", message)
|
||||
}
|
||||
|
||||
sender ! Success(remoteAddress)
|
||||
} catch {
|
||||
case exc: Exception ⇒
|
||||
sender ! Failure(exc)
|
||||
throw exc
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// FIXME implement handleRelease
|
||||
|
|
@ -201,45 +195,47 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
|
|||
/*
|
||||
* generate name for temporary actor refs
|
||||
*/
|
||||
private val tempNumber = new AtomicLong
|
||||
def tempName = "$_" + Helpers.base64(tempNumber.getAndIncrement())
|
||||
def tempPath = remoteDaemon.path / tempName
|
||||
|
||||
// FIXME: handle real remote supervision, ticket #1408
|
||||
def handle_fun0_unit(message: RemoteSystemDaemonMessageProtocol) {
|
||||
new LocalActorRef(systemImpl,
|
||||
Props(
|
||||
context ⇒ {
|
||||
case f: Function0[_] ⇒ try { f() } finally { context.self.stop() }
|
||||
}).copy(dispatcher = computeGridDispatcher), remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Function0[Unit]])
|
||||
}
|
||||
|
||||
// FIXME: handle real remote supervision, ticket #1408
|
||||
def handle_fun0_any(message: RemoteSystemDaemonMessageProtocol) {
|
||||
new LocalActorRef(systemImpl,
|
||||
Props(
|
||||
context ⇒ {
|
||||
case f: Function0[_] ⇒ try { sender ! f() } finally { context.self.stop() }
|
||||
}).copy(dispatcher = computeGridDispatcher), remoteDaemon, tempPath, systemService = true) forward payloadFor(message, classOf[Function0[Any]])
|
||||
}
|
||||
|
||||
// FIXME: handle real remote supervision, ticket #1408
|
||||
def handle_fun1_arg_unit(message: RemoteSystemDaemonMessageProtocol) {
|
||||
new LocalActorRef(systemImpl,
|
||||
Props(
|
||||
context ⇒ {
|
||||
case (fun: Function[_, _], param: Any) ⇒ try { fun.asInstanceOf[Any ⇒ Unit].apply(param) } finally { context.self.stop() }
|
||||
}).copy(dispatcher = computeGridDispatcher), remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
|
||||
}
|
||||
|
||||
// FIXME: handle real remote supervision, ticket #1408
|
||||
def handle_fun1_arg_any(message: RemoteSystemDaemonMessageProtocol) {
|
||||
new LocalActorRef(systemImpl,
|
||||
Props(
|
||||
context ⇒ {
|
||||
case (fun: Function[_, _], param: Any) ⇒ try { sender ! fun.asInstanceOf[Any ⇒ Any](param) } finally { context.self.stop() }
|
||||
}).copy(dispatcher = computeGridDispatcher), remoteDaemon, tempPath, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]])
|
||||
}
|
||||
// private val tempNumber = new AtomicLong
|
||||
// def tempName = "$_" + Helpers.base64(tempNumber.getAndIncrement())
|
||||
// def tempPath = remote.remoteDaemon.path / tempName
|
||||
//
|
||||
// // FIXME: handle real remote supervision, ticket #1408
|
||||
// def handle_fun0_unit(message: RemoteSystemDaemonMessageProtocol) {
|
||||
// new LocalActorRef(remote.system,
|
||||
// Props(
|
||||
// context ⇒ {
|
||||
// case f: Function0[_] ⇒ try { f() } finally { context.self.stop() }
|
||||
// }).copy(dispatcher = remote.computeGridDispatcher), remote.remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Function0[Unit]])
|
||||
// }
|
||||
//
|
||||
// // FIXME: handle real remote supervision, ticket #1408
|
||||
// def handle_fun0_any(message: RemoteSystemDaemonMessageProtocol, sender: ActorRef) {
|
||||
// implicit val s = sender
|
||||
// new LocalActorRef(remote.system,
|
||||
// Props(
|
||||
// context ⇒ {
|
||||
// case f: Function0[_] ⇒ try { context.sender ! f() } finally { context.self.stop() }
|
||||
// }).copy(dispatcher = remote.computeGridDispatcher), remote.remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Function0[Any]])
|
||||
// }
|
||||
//
|
||||
// // FIXME: handle real remote supervision, ticket #1408
|
||||
// def handle_fun1_arg_unit(message: RemoteSystemDaemonMessageProtocol) {
|
||||
// new LocalActorRef(remote.system,
|
||||
// Props(
|
||||
// context ⇒ {
|
||||
// case (fun: Function[_, _], param: Any) ⇒ try { fun.asInstanceOf[Any ⇒ Unit].apply(param) } finally { context.self.stop() }
|
||||
// }).copy(dispatcher = remote.computeGridDispatcher), remote.remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
|
||||
// }
|
||||
//
|
||||
// // FIXME: handle real remote supervision, ticket #1408
|
||||
// def handle_fun1_arg_any(message: RemoteSystemDaemonMessageProtocol, sender: ActorRef) {
|
||||
// implicit val s = sender
|
||||
// new LocalActorRef(remote.system,
|
||||
// Props(
|
||||
// context ⇒ {
|
||||
// case (fun: Function[_, _], param: Any) ⇒ try { context.sender ! fun.asInstanceOf[Any ⇒ Any](param) } finally { context.self.stop() }
|
||||
// }).copy(dispatcher = remote.computeGridDispatcher), remote.remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]])
|
||||
// }
|
||||
|
||||
def handleFailover(message: RemoteSystemDaemonMessageProtocol) {
|
||||
// val (from, to) = payloadFor(message, classOf[(InetSocketremoteDaemonServiceName, InetSocketremoteDaemonServiceName)])
|
||||
|
|
@ -247,7 +243,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
|
|||
}
|
||||
|
||||
private def payloadFor[T](message: RemoteSystemDaemonMessageProtocol, clazz: Class[T]): T = {
|
||||
serialization.deserialize(message.getPayload.toByteArray, clazz, None) match {
|
||||
remote.serialization.deserialize(message.getPayload.toByteArray, clazz, None) match {
|
||||
case Left(error) ⇒ throw error
|
||||
case Right(instance) ⇒ instance.asInstanceOf[T]
|
||||
}
|
||||
|
|
@ -275,6 +271,8 @@ trait RemoteMarshallingOps {
|
|||
|
||||
def system: ActorSystem
|
||||
|
||||
def remote: Remote
|
||||
|
||||
protected def useUntrustedMode: Boolean
|
||||
|
||||
def createMessageSendEnvelope(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = {
|
||||
|
|
@ -312,7 +310,18 @@ trait RemoteMarshallingOps {
|
|||
def receiveMessage(remoteMessage: RemoteMessage) {
|
||||
log.debug("received message {}", remoteMessage)
|
||||
|
||||
val remoteDaemon = remote.remoteDaemon
|
||||
|
||||
remoteMessage.recipient match {
|
||||
case `remoteDaemon` ⇒
|
||||
remoteMessage.payload match {
|
||||
case m: RemoteSystemDaemonMessageProtocol ⇒
|
||||
implicit val timeout = system.settings.ActorTimeout
|
||||
try remoteDaemon ! m catch {
|
||||
case e: Exception ⇒ log.error(e, "exception while processing remote command {} from {}", m.getMessageType(), remoteMessage.sender)
|
||||
}
|
||||
case x ⇒ log.warning("remoteDaemon received illegal message {} from {}", x, remoteMessage.sender)
|
||||
}
|
||||
case l @ (_: LocalActorRef | _: MinimalActorRef) ⇒
|
||||
remoteMessage.payload match {
|
||||
case msg: SystemMessage ⇒
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue