diff --git a/akka-remote/src/main/scala/akka/remote/RemoteClient.scala b/akka-remote/src/main/scala/akka/remote/RemoteClient.scala index 3d90d0f417..45500b561f 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteClient.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteClient.scala @@ -56,7 +56,7 @@ case class RemoteClientShutdown( */ class RemoteClientException private[akka] (message: String, @BeanProperty val client: RemoteClient) extends AkkaException(message) -case class UnparsableException private[akka] (val originalClassName: String, val originalMessage: String) extends AkkaException(originalMessage) +case class UnparsableException private[akka] (originalClassName: String, originalMessage: String) extends AkkaException(originalMessage) /** * The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles. @@ -416,32 +416,33 @@ class RemoteClientHandler( override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) { try { - val result = event.getMessage - if (result.isInstanceOf[RemoteMessageProtocol]) { - val reply = result.asInstanceOf[RemoteMessageProtocol] - val replyUuid = uuidFrom(reply.getUuid.getHigh, reply.getUuid.getLow) - log.debug("Remote client received RemoteMessageProtocol[\n{}]",reply) - val future = futures.get(replyUuid).asInstanceOf[CompletableFuture[Any]] - if (reply.hasMessage) { - if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist") - val message = MessageSerializer.deserialize(reply.getMessage) - future.completeWithResult(message) - } else { - if (reply.hasSupervisorUuid()) { - val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow) - if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException( - "Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found") - val supervisedActor = supervisors.get(supervisorUuid) - if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException( - "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") - else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader)) + event.getMessage match { + case reply: RemoteMessageProtocol => + val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow) + log.debug("Remote client received RemoteMessageProtocol[\n{}]",reply) + val future = futures.remove(replyUuid).asInstanceOf[CompletableFuture[Any]] + + if (reply.hasMessage) { + if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist") + val message = MessageSerializer.deserialize(reply.getMessage) + future.completeWithResult(message) + } else { + val exception = parseException(reply, client.loader) + if (reply.hasSupervisorUuid()) { + val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow) + if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException( + "Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found") + val supervisedActor = supervisors.get(supervisorUuid) + if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException( + "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") + else supervisedActor.supervisor.get ! Exit(supervisedActor, exception) + } + + future.completeWithException(exception) } - val exception = parseException(reply, client.loader) - future.completeWithException(exception) - } - futures remove replyUuid - } else { - val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client) + + case message => + val exception = new RemoteClientException("Unknown message received in remote client handler: " + message, client) client.notifyListeners(RemoteClientError(exception, client)) throw exception } diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index b93b472f51..d64e0552ab 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -295,7 +295,10 @@ object RemoteActorSerialization { } val actorInfo = actorInfoBuilder.build val messageBuilder = RemoteMessageProtocol.newBuilder - .setUuid(uuidProtocol) + .setUuid({ + val messageUuid = newUuid + UuidProtocol.newBuilder.setHigh(messageUuid.getTime).setLow(messageUuid.getClockSeqAndNode).build + }) .setActorInfo(actorInfo) .setOneWay(isOneWay)