diff --git a/akka-remote/src/main/scala/akka/remote/RemoteServer.scala b/akka-remote/src/main/scala/akka/remote/RemoteServer.scala index 6dd418b046..1d79978559 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteServer.scala @@ -326,32 +326,25 @@ class RemoteServer extends Logging with ListenerManagement { } private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) { - // TODO: contains is an alias for containsValue, not containsKey, so the contains is useless here - if (_isRunning && !registry.contains(id)) { - registry.put(id, actorRef); + if (_isRunning) { + registry.put(id, actorRef) //TODO change to putIfAbsent if (!actorRef.isRunning) actorRef.start } } private def registerPerSession[Key](id: Key, factory: () => ActorRef, registry: ConcurrentHashMap[Key,() => ActorRef]) { - // TODO: contains is an alias for containsValue, not containsKey, so the contains is useless here - if (_isRunning && !registry.contains(id)) { - registry.put(id, factory) - } + if (_isRunning) + registry.put(id, factory) //TODO change to putIfAbsent } private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) { - // TODO: contains is an alias for containsValue, not containsKey, so the contains is useless here - if (_isRunning && !registry.contains(id)) { - registry.put(id, typedActor) - } + if (_isRunning) + registry.put(id, typedActor) //TODO change to putIfAbsent } private def registerTypedPerSessionActor[Key](id: Key, factory: () => AnyRef, registry: ConcurrentHashMap[Key,() => AnyRef]) { - // TODO: contains is an alias for containsValue, not containsKey, so the contains is useless here - if (_isRunning && !registry.contains(id)) { - registry.put(id, factory) - } + if (_isRunning) + registry.put(id, factory) //TODO change to putIfAbsent } /** @@ -531,18 +524,21 @@ class RemoteServerHandler( val clientAddress = getClientAddress(ctx) log.debug("Remote client [%s] disconnected from [%s]", clientAddress, server.name) // stop all session actors - val channelActors = sessionActors.remove(event.getChannel()) - val channelActorsIterator = channelActors.elements() - while (channelActorsIterator.hasMoreElements()) { - channelActorsIterator.nextElement().stop() + val channelActors = sessionActors.remove(event.getChannel) + if (channelActors ne null) { + val channelActorsIterator = channelActors.elements + while (channelActorsIterator.hasMoreElements) { + channelActorsIterator.nextElement.stop + } } - val channelTypedActors = typedSessionActors.remove(event.getChannel()) - val channelTypedActorsIterator = channelTypedActors.elements() - while (channelTypedActorsIterator.hasMoreElements()) { - TypedActor.stop(channelTypedActorsIterator.nextElement()) + val channelTypedActors = typedSessionActors.remove(event.getChannel) + if (channelTypedActors ne null) { + val channelTypedActorsIterator = channelTypedActors.elements + while (channelTypedActorsIterator.hasMoreElements) { + TypedActor.stop(channelTypedActorsIterator.nextElement) + } } - server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress)) } @@ -629,7 +625,7 @@ class RemoteServerHandler( val exception = f.exception if (exception.isDefined) { - log.debug("Returning exception from actor invocation [%s]".format(exception.get)) + log.debug("Returning exception from actor invocation [%s]",exception.get) try { channel.write(createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor)) } catch { @@ -679,26 +675,35 @@ class RemoteServerHandler( val messageReceiver = typedActor.getClass.getDeclaredMethod(typedActorInfo.getMethod, argClasses: _*) if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*) else { - val result = messageReceiver.invoke(typedActor, args: _*) match { - case f: Future[_] => f.await.result.get //TODO replace this with a Listener on the Future to avoid blocking - case other => other + //Sends the response + def sendResponse(result: Either[Any,Throwable]): Unit = try { + val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( + None, + Right(request.getUuid), + actorInfo.getId, + actorInfo.getTarget, + actorInfo.getTimeout, + result, + true, + None, + None, + AkkaActorType.TypedActor, + None) + if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) + channel.write(messageBuilder.build) + log.debug("Returning result from remote typed actor invocation [%s]", result) + } catch { + case e: Throwable => server.notifyListeners(RemoteServerError(e, server)) } - log.debug("Returning result from remote typed actor invocation [%s]", result) - val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( - None, - Right(request.getUuid), - actorInfo.getId, - actorInfo.getTarget, - actorInfo.getTimeout, - Left(result), - true, - None, - None, - AkkaActorType.TypedActor, - None) - if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) - channel.write(messageBuilder.build) + messageReceiver.invoke(typedActor, args: _*) match { + case f: Future[_] => //If it's a future, we can lift on that to defer the send to when the future is completed + f.onComplete( future => { + val result: Either[Any,Throwable] = if (future.exception.isDefined) Right(future.exception.get) else Left(future.result.get) + sendResponse(result) + }) + case other => sendResponse(Left(other)) + } } } catch { case e: InvocationTargetException =>