From d64b2a7292ff585742fb93a3b5a04a3b92c1ffe0 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 28 Oct 2011 23:11:35 +0200 Subject: [PATCH] All green, fixing issues with the new ask implementation and remoting --- .../src/main/scala/akka/AkkaApplication.scala | 2 +- .../src/main/scala/akka/actor/ActorRef.scala | 26 +++++------ .../scala/akka/actor/ActorRefProvider.scala | 5 +++ .../src/main/scala/akka/dispatch/Future.scala | 2 +- .../scala/akka/remote/RemoteInterface.scala | 45 ------------------- .../src/main/scala/akka/remote/Remote.scala | 11 ++--- .../akka/remote/RemoteActorRefProvider.scala | 34 +++++++------- .../akka/remote/RemoteConnectionManager.scala | 2 +- .../remote/netty/NettyRemoteSupport.scala | 17 +++---- .../serialization/SerializationProtocol.scala | 13 ++---- 10 files changed, 48 insertions(+), 109 deletions(-) diff --git a/akka-actor/src/main/scala/akka/AkkaApplication.scala b/akka-actor/src/main/scala/akka/AkkaApplication.scala index 28cc0a5668..cd74495a5a 100644 --- a/akka-actor/src/main/scala/akka/AkkaApplication.scala +++ b/akka-actor/src/main/scala/akka/AkkaApplication.scala @@ -156,7 +156,7 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor case value ⇒ value.toInt } - val defaultAddress = new InetSocketAddress(hostname, AkkaConfig.RemoteServerPort) + val defaultAddress = new InetSocketAddress(hostname, port) // TODO correctly pull its config from the config val dispatcherFactory = new Dispatchers(this) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 6d6e738602..89c388ce17 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -10,6 +10,7 @@ import scala.collection.immutable.Stack import java.lang.{ UnsupportedOperationException, IllegalStateException } import akka.AkkaApplication import akka.event.ActorEventBus +import akka.serialization.Serialization /** * ActorRef is an immutable and serializable handle to an Actor. @@ -50,8 +51,6 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable */ def address: String - private[akka] def uuid: Uuid //TODO FIXME REMOVE THIS - /** * Comparison only takes address into account. */ @@ -150,7 +149,7 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable * @author Jonas Bonér */ class LocalActorRef private[akka] ( - app: AkkaApplication, + _app: AkkaApplication, props: Props, _supervisor: ActorRef, _givenAddress: String, @@ -165,7 +164,7 @@ class LocalActorRef private[akka] ( case other ⇒ other } - private[this] val actorCell = new ActorCell(app, this, props, _supervisor, receiveTimeout, hotswap) + private[this] val actorCell = new ActorCell(_app, this, props, _supervisor, receiveTimeout, hotswap) actorCell.start() /** @@ -233,20 +232,14 @@ class LocalActorRef private[akka] ( protected[akka] def postMessageToMailbox(message: Any, sender: ActorRef): Unit = actorCell.postMessageToMailbox(message, sender) - def ?(message: Any)(implicit timeout: Timeout): Future[Any] = app.provider.ask(message, this, timeout) + def ?(message: Any)(implicit timeout: Timeout): Future[Any] = actorCell.provider.ask(message, this, timeout) protected[akka] def handleFailure(fail: Failed): Unit = actorCell.handleFailure(fail) protected[akka] def restart(cause: Throwable): Unit = actorCell.restart(cause) - // ========= PRIVATE FUNCTIONS ========= - @throws(classOf[java.io.ObjectStreamException]) - private def writeReplace(): AnyRef = { - // TODO: this was used to really send LocalActorRef across the network, which is broken now - val inetaddr = app.defaultAddress - SerializedActorRef(uuid, address, inetaddr.getAddress.getHostAddress, inetaddr.getPort) - } + private def writeReplace(): AnyRef = actorCell.provider.serialize(this) } /** @@ -284,6 +277,8 @@ trait ScalaActorRef { ref: ActorRef ⇒ protected[akka] def postMessageToMailbox(message: Any, sender: ActorRef): Unit protected[akka] def restart(cause: Throwable): Unit + + private[akka] def uuid: Uuid //TODO FIXME REMOVE THIS } /** @@ -357,7 +352,7 @@ trait MinimalActorRef extends ActorRef with ScalaActorRef { case class DeadLetter(message: Any, sender: ActorRef) -class DeadLetterActorRef(app: AkkaApplication) extends MinimalActorRef { +class DeadLetterActorRef(val app: AkkaApplication) extends MinimalActorRef { val brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef, promises are always broken.")))(app.dispatcher) override val address: String = "akka:internal:DeadLetterActorRef" @@ -368,7 +363,7 @@ class DeadLetterActorRef(app: AkkaApplication) extends MinimalActorRef { def ?(message: Any)(implicit timeout: Timeout): Future[Any] = brokenPromise } -abstract class AskActorRef(app: AkkaApplication)(timeout: Timeout = app.AkkaConfig.ActorTimeout, dispatcher: MessageDispatcher = app.dispatcher) extends MinimalActorRef { +abstract class AskActorRef(protected val app: AkkaApplication)(timeout: Timeout = app.AkkaConfig.ActorTimeout, dispatcher: MessageDispatcher = app.dispatcher) extends MinimalActorRef { final val result = new DefaultPromise[Any](timeout)(dispatcher) { @@ -396,4 +391,7 @@ abstract class AskActorRef(app: AkkaApplication)(timeout: Timeout = app.AkkaConf override def isShutdown = result.isCompleted || result.isExpired override def stop(): Unit = if (!isShutdown) result.completeWithException(new ActorKilledException("Stopped")) + + @throws(classOf[java.io.ObjectStreamException]) + private def writeReplace(): AnyRef = app.provider.serialize(this) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 81c4b66afc..f05fbbe86a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -30,6 +30,7 @@ trait ActorRefProvider { private[akka] def evict(address: String): Boolean private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] + private[akka] def serialize(actor: ActorRef): AnyRef private[akka] def createDeathWatch(): DeathWatch @@ -100,6 +101,8 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { @volatile var stopped = false + def app = LocalActorRefProvider.this.app + override def address = app.name + ":BubbleWalker" override def toString = address @@ -216,6 +219,8 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { } private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = actorFor(actor.address) + private[akka] def serialize(actor: ActorRef): AnyRef = + SerializedActorRef(actor.uuid, actor.address, app.hostname, app.port) private[akka] def createDeathWatch(): DeathWatch = new LocalDeathWatch diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 86a998b350..6453403ce7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -464,7 +464,7 @@ sealed trait Future[+T] extends japi.Future[T] { try { Some(BoxedType(m.erasure).cast(v).asInstanceOf[A]) } catch { case c: ClassCastException ⇒ if (v.asInstanceOf[AnyRef] eq null) throw new ClassCastException("null cannot be cast to " + m.erasure) - else throw new ClassCastException("" + v + " of class " + v.asInstanceOf[AnyRef].getClass + " cannot be cast to " + m.erasure) + else throw new ClassCastException("'" + v + "' of class " + v.asInstanceOf[AnyRef].getClass + " cannot be cast to " + m.erasure) } } } diff --git a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala index 99cfbccc0c..c0e4a3a730 100644 --- a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala @@ -48,51 +48,6 @@ trait RemoteModule { if (actorRefOrNull eq null) actorRefOrNull = findActorByUuid(uuid) actorRefOrNull } - - /* - private[akka] def findActorByAddress(address: String): ActorRef = { - val cachedActorRef = actors.get(address) - if (cachedActorRef ne null) cachedActorRef - else { - val actorRef = - Deployer.lookupDeploymentFor(address) match { - case Some(Deploy(_, router, _, Cluster(home, _, _))) ⇒ - - if (DeploymentConfig.isHomeNode(home)) { // on home node - Actor.registry.actorFor(address) match { // try to look up in actor registry - case Some(actorRef) ⇒ // in registry -> DONE - actorRef - case None ⇒ // not in registry -> check out as 'ref' from cluster (which puts it in actor registry for next time around) - Actor.cluster.ref(address, DeploymentConfig.routerTypeFor(router)) - } - } else throw new IllegalActorStateException("Trying to look up remote actor on non-home node. FIXME: fix this behavior") - - case Some(Deploy(_, _, _, Local)) ⇒ - Actor.registry.actorFor(address).getOrElse(throw new IllegalActorStateException("Could not lookup locally deployed actor in actor registry")) - - case _ ⇒ - actors.get(address) // FIXME do we need to fall back to local here? If it is not clustered then it should not be a remote actor in the first place. Throw exception. - } - - actors.put(address, actorRef) // cache it for next time around - actorRef - } - } - - private[akka] def findActorByUuid(uuid: String): ActorRef = actorsByUuid.get(uuid) - - private[akka] def findActorFactory(address: String): () ⇒ ActorRef = actorsFactories.get(address) - - private[akka] def findActorByAddressOrUuid(address: String, uuid: String): ActorRef = { - // find by address - var actorRefOrNull = - if (address.startsWith(UUID_PREFIX)) findActorByUuid(address.substring(UUID_PREFIX.length)) // FIXME remove lookup by UUID? probably - else findActorByAddress(address) - // find by uuid - if (actorRefOrNull eq null) actorRefOrNull = findActorByUuid(uuid) - actorRefOrNull - } - */ } /** diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 2d78b2f306..7e3bb86a78 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -55,8 +55,7 @@ class Remote(val app: AkkaApplication) extends RemoteService { private[remote] lazy val remoteDaemon = new LocalActorRef( app, - Props(new RemoteSystemDaemon(this)) - .withDispatcher(dispatcherFactory.newPinnedDispatcher(remoteDaemonServiceName)), + Props(new RemoteSystemDaemon(this)).withDispatcher(dispatcherFactory.newPinnedDispatcher(remoteDaemonServiceName)), remoteDaemonSupervisor, remoteDaemonServiceName, systemService = true) @@ -118,8 +117,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { def receive: Actor.Receive = { case message: RemoteSystemDaemonMessageProtocol ⇒ - eventHandler.debug(this, - "Received command [\n%s] to RemoteSystemDaemon on [%s]".format(message, nodename)) + eventHandler.debug(this, "Received command [\n%s] to RemoteSystemDaemon on [%s]".format(message, nodename)) message.getMessageType match { case USE ⇒ handleUse(message) @@ -145,8 +143,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { if (message.hasActorAddress) { val actorFactoryBytes = - if (shouldCompressData) LZF.uncompress(message.getPayload.toByteArray) - else message.getPayload.toByteArray + if (shouldCompressData) LZF.uncompress(message.getPayload.toByteArray) else message.getPayload.toByteArray val actorFactory = serialization.deserialize(actorFactoryBytes, classOf[() ⇒ Actor], None) match { @@ -165,7 +162,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { sender ! Success(address.toString) } catch { - case error: Throwable ⇒ + case error: Throwable ⇒ //FIXME doesn't seem sensible sender ! Failure(error) throw error } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 4bc406a226..dedf5a4044 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -156,13 +156,14 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider * Returns true if the actor was in the provider's cache and evicted successfully, else false. */ private[akka] def evict(address: String): Boolean = actors.remove(address) ne null - - private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = { - local.actorFor(actor.address) orElse { - Some(RemoteActorRef(remote.server, new InetSocketAddress(actor.hostname, actor.port), actor.address, None)) - } + private[akka] def serialize(actor: ActorRef): AnyRef = actor match { + case r: RemoteActorRef ⇒ SerializedActorRef(actor.uuid, actor.address, r.remoteAddress.getAddress.getHostAddress, r.remoteAddress.getPort) + case other ⇒ local.serialize(actor) } + private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = + local.actorFor(actor.address) orElse Some(RemoteActorRef(remote.server, new InetSocketAddress(actor.hostname, actor.port), actor.address, None)) + /** * Using (checking out) actor on a specific node. */ @@ -171,10 +172,8 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider val actorFactoryBytes = app.serialization.serialize(actorFactory) match { - case Left(error) ⇒ throw error - case Right(bytes) ⇒ - if (remote.shouldCompressData) LZF.compress(bytes) - else bytes + case Left(error) ⇒ throw error + case Right(bytes) ⇒ if (remote.shouldCompressData) LZF.compress(bytes) else bytes } val command = RemoteSystemDaemonMessageProtocol.newBuilder @@ -183,9 +182,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider .setPayload(ByteString.copyFrom(actorFactoryBytes)) .build() - val connectionFactory = - () ⇒ remote.server.actorFor( - remote.remoteDaemonServiceName, remoteAddress.getHostName, remoteAddress.getPort) + val connectionFactory = () ⇒ remote.server.actorFor(remote.remoteDaemonServiceName, remoteAddress.getAddress.getHostAddress, remoteAddress.getPort) // try to get the connection for the remote address, if not already there then create it val connection = remoteDaemonConnectionManager.putIfAbsent(remoteAddress, connectionFactory) @@ -196,11 +193,12 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider private def sendCommandToRemoteNode(connection: ActorRef, command: RemoteSystemDaemonMessageProtocol, withACK: Boolean) { if (withACK) { try { - (connection ? (command, remote.remoteSystemDaemonAckTimeout)).as[Status] match { - case Some(Success(receiver)) ⇒ + val f = connection ? (command, remote.remoteSystemDaemonAckTimeout) + (try f.await.value catch { case _: FutureTimeoutException ⇒ None }) match { + case Some(Right(receiver)) ⇒ app.eventHandler.debug(this, "Remote system command sent to [%s] successfully received".format(receiver)) - case Some(Failure(cause)) ⇒ + case Some(Left(cause)) ⇒ app.eventHandler.error(cause, this, cause.toString) throw cause @@ -247,7 +245,7 @@ private[akka] case class RemoteActorRef private[akka] ( protected[akka] def sendSystemMessage(message: SystemMessage): Unit = unsupported def postMessageToMailbox(message: Any, sender: ActorRef) { - remote.send[Any](message, Some(sender), remoteAddress, this, loader) + remote.send[Any](message, Option(sender), remoteAddress, this, loader) } def ?(message: Any)(implicit timeout: Timeout): Future[Any] = remote.app.provider.ask(message, this, timeout) @@ -266,9 +264,7 @@ private[akka] case class RemoteActorRef private[akka] ( } @throws(classOf[java.io.ObjectStreamException]) - private def writeReplace(): AnyRef = { - SerializedActorRef(uuid, address, remoteAddress.getAddress.getHostAddress, remoteAddress.getPort) - } + private def writeReplace(): AnyRef = app.provider.serialize(this) def startsMonitoring(actorRef: ActorRef): ActorRef = unsupported diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala index f8837ec4f4..a2a5ef4a93 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala @@ -31,7 +31,7 @@ class RemoteConnectionManager( def iterable: Iterable[ActorRef] = connections.values } - val failureDetector = remote.failureDetector + def failureDetector = remote.failureDetector private val state: AtomicReference[State] = new AtomicReference[State](newState()) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 16c910dfcd..66ce5a5334 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -159,7 +159,7 @@ abstract class RemoteClient private[akka] ( * Converts the message to the wireprotocol and sends the message across the wire */ def send[T](message: Any, senderOption: Option[ActorRef], remoteAddress: InetSocketAddress, actorRef: ActorRef) { - val messageProtocol = serialization.createRemoteMessageProtocolBuilder(Some(actorRef), Left(actorRef.uuid), actorRef.address, app.AkkaConfig.ActorTimeoutMillis, Right(message), senderOption).build + val messageProtocol = serialization.createRemoteMessageProtocolBuilder(Option(actorRef), Left(actorRef.uuid), actorRef.address, app.AkkaConfig.ActorTimeoutMillis, Right(message), senderOption).build send(messageProtocol) } @@ -862,19 +862,12 @@ class RemoteServerHandler( return } - val message = MessageSerializer.deserialize(app, request.getMessage) val sender = if (request.hasSender) serialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader) else app.deadLetters - message match { - // first match on system messages - case _: Terminate ⇒ - if (UNTRUSTED_MODE) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") else actorRef.stop() - - case _: AutoReceivedMessage if (UNTRUSTED_MODE) ⇒ - throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor") - - case _ ⇒ // then match on user defined messages - actorRef.!(message)(sender) + MessageSerializer.deserialize(app, request.getMessage) match { + case _: Terminate ⇒ if (UNTRUSTED_MODE) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") else actorRef.stop() + case _: AutoReceivedMessage if (UNTRUSTED_MODE) ⇒ throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor") + case m ⇒ actorRef.!(m)(sender) } } diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index 75415b9be1..bea53aee16 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -60,11 +60,9 @@ class RemoteActorSerialization(remote: RemoteSupport) { val remoteAddress = actor match { case ar: RemoteActorRef ⇒ ar.remoteAddress - case ar: LocalActorRef ⇒ - remote.registerByUuid(ar) - remote.app.defaultAddress - case _ ⇒ - remote.app.defaultAddress + case ar: ActorRef ⇒ + remote.register(ar) //FIXME stop doing this and delegate to provider.actorFor in the NettyRemoting + remote.app.defaultAddress //FIXME Shouldn't this be the _current_ address of the remoting? } remote.app.eventHandler.debug(this, "Register serialized Actor [%s] as remote @ [%s]".format(actor.uuid, remoteAddress)) @@ -89,10 +87,7 @@ class RemoteActorSerialization(remote: RemoteSupport) { case Right(protocol) ⇒ protocol } - val actorInfoBuilder = ActorInfoProtocol.newBuilder - .setUuid(uuidProtocol) - .setAddress(actorAddress) - .setTimeout(timeout) + val actorInfoBuilder = ActorInfoProtocol.newBuilder.setUuid(uuidProtocol).setAddress(actorAddress).setTimeout(timeout) val actorInfo = actorInfoBuilder.build val messageBuilder = RemoteMessageProtocol.newBuilder