diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 8556f8e7ac..dadf020236 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -13,7 +13,12 @@ import akka.serialization.RemoteActorSerialization._ import akka.japi.Creator import akka.config.Config._ import akka.remoteinterface._ -import akka.actor.{EventHandler, Index, ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType} +import akka.actor.{PoisonPill, EventHandler, Index, + ActorInitializationException, LocalActorRef, newUuid, + ActorRegistry, Actor, RemoteActorRef, + TypedActor, ActorRef, IllegalActorStateException, + RemoteActorSystemMessage, uuidFrom, Uuid, + Exit, LifeCycleMessage, ActorType => AkkaActorType} import akka.AkkaException import akka.actor.Actor._ import akka.util._ @@ -193,7 +198,7 @@ abstract class RemoteClient private[akka] ( actorRef.id, actorRef.actorClassName, actorRef.timeout, - Left(message), + Right(message), isOneWay, senderOption, typedActorInfo, @@ -810,8 +815,10 @@ class RemoteServerHandler( // stop all session actors for (map <- Option(sessionActors.remove(event.getChannel)); actor <- asScalaIterable(map.values)) { - try { actor.stop } catch { case e: Exception => } + try { actor ! PoisonPill } catch { case e: Exception => } } + + //FIXME switch approach or use other thread to execute this // stop all typed session actors for (map <- Option(typedSessionActors.remove(event.getChannel)); actor <- asScalaIterable(map.values)) { @@ -886,22 +893,17 @@ class RemoteServerHandler( message, request.getActorInfo.getTimeout, None, - Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout). - onComplete(f => { - val result = f.result - val exception = f.exception - - if (exception.isDefined) { - write(channel, createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor)) - } - else if (result.isDefined) { - val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( + Some(new DefaultCompletableFuture[Any](request.getActorInfo.getTimeout). + onComplete(_.value.get match { + case l: Left[Throwable, Any] => write(channel, createErrorReplyMessage(l.a, request, AkkaActorType.ScalaActor)) + case r: Right[Throwable, Any] => + val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( Some(actorRef), Right(request.getUuid), actorInfo.getId, actorInfo.getTarget, actorInfo.getTimeout, - Left(result.get), + r, true, Some(actorRef), None, @@ -912,7 +914,6 @@ class RemoteServerHandler( if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) write(channel, RemoteEncoder.encode(messageBuilder.build)) - } } ) )) @@ -957,10 +958,10 @@ class RemoteServerHandler( try { val messageReceiver = resolveMethod(typedActor.getClass, ownerTypeHint, typedActorInfo.getMethod, argClasses) - if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*) + if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*) //FIXME execute in non-IO thread else { //Sends the response - def sendResponse(result: Either[Any,Throwable]): Unit = try { + def sendResponse(result: Either[Throwable,Any]): Unit = try { val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( None, Right(request.getUuid), @@ -982,14 +983,10 @@ class RemoteServerHandler( server.notifyListeners(RemoteServerError(e, server)) } - messageReceiver.invoke(typedActor, args: _*) match { - case f: Future[_] => //If it's a future, we can lift on that to defer the send to when the future is completed - f.onComplete( future => { - val result: Either[Any,Throwable] = - if (future.exception.isDefined) Right(future.exception.get) else Left(future.result.get) - sendResponse(result) - }) - case other => sendResponse(Left(other)) + messageReceiver.invoke(typedActor, args: _*) match { //FIXME execute in non-IO thread + //If it's a future, we can lift on that to defer the send to when the future is completed + case f: Future[_] => f.onComplete( future => sendResponse(future.value.get) ) + case other => sendResponse(Right(other)) } } } catch { @@ -1152,7 +1149,7 @@ class RemoteServerHandler( actorInfo.getId, actorInfo.getTarget, actorInfo.getTimeout, - Right(exception), + Left(exception), true, None, None, diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index 12ee2228f5..7ad0c1e443 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -138,7 +138,7 @@ object ActorSerialization { actorRef.id, actorRef.actorClassName, actorRef.timeout, - Left(m.message), + Right(m.message), false, actorRef.getSender, None, @@ -279,7 +279,7 @@ object RemoteActorSerialization { actorId: String, actorClassName: String, timeout: Long, - message: Either[Any, Throwable], + message: Either[Throwable, Any], isOneWay: Boolean, senderOption: Option[ActorRef], typedActorInfo: Option[Tuple2[String, String]], @@ -319,9 +319,9 @@ object RemoteActorSerialization { .setOneWay(isOneWay) message match { - case Left(message) => + case Right(message) => messageBuilder.setMessage(MessageSerializer.serialize(message)) - case Right(exception) => + case Left(exception) => messageBuilder.setException(ExceptionProtocol.newBuilder .setClassname(exception.getClass.getName) .setMessage(empty(exception.getMessage))