diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index 4c5442de65..57fb3d4078 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -528,11 +528,16 @@ class RemoteServerHandler( val clientAddress = getClientAddress(ctx) log.debug("Remote client [%s] disconnected from [%s]", clientAddress, server.name) // stop all session actors - for ((id, actorRef) <- sessionActors.remove(event.getChannel())) { - actorRef.stop() + val channelActors = sessionActors.remove(event.getChannel()) + val channelActorsIterator = channelActors.elements() + while (channelActorsIterator.hasMoreElements()) { + channelActorsIterator.nextElement().stop() } - for ((id, actorRef) <- typedSessionActors.remove(event.getChannel())) { - TypedActor.stop(actorRef) + + val channelTypedActors = typedSessionActors.remove(event.getChannel()) + val channelTypedActorsIterator = channelTypedActors.elements() + while (channelTypedActorsIterator.hasMoreElements()) { + TypedActor.stop(channelTypedActorsIterator.nextElement()) } server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress)) @@ -749,40 +754,36 @@ class RemoteServerHandler( } /** - * Creates a new instance of the actor with name, uuid and timeout specified as arguments. - * - * If actor already created then just return it from the registry. - * - * Does not start the actor. + * gets the actor from the session, or creates one if there is a factory for it */ - private def createActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = { + private def createSessionActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = { val uuid = actorInfo.getUuid val id = actorInfo.getId - - val name = actorInfo.getTarget - val timeout = actorInfo.getTimeout - - val actorRefOrNull = findActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) - - if (actorRefOrNull ne null) - return actorRefOrNull - - val sessionActorRefOrNull = findSessionActor(id, channel); if (sessionActorRefOrNull ne null) - return sessionActorRefOrNull - - - // we dont have it in the session either, see if we have a factory for it - val actorFactoryOrNull = findActorFactory(id) - if (actorFactoryOrNull ne null) { - val actorRef = actorFactoryOrNull(); - actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) - sessionActors.get(channel).put(id, actorRef); - return actorRef + sessionActorRefOrNull + else + { + // we dont have it in the session either, see if we have a factory for it + val actorFactoryOrNull = findActorFactory(id) + if (actorFactoryOrNull ne null) { + val actorRef = actorFactoryOrNull(); + actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) + sessionActors.get(channel).put(id, actorRef); + actorRef + } + else + null } + } + + + private def createClientManagedActor(actorInfo: ActorInfoProtocol): ActorRef = { + val uuid = actorInfo.getUuid + val id = actorInfo.getId + val timeout = actorInfo.getTimeout + val name = actorInfo.getTarget - // None of the above, so treat it as a client managed remote actor try { if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException( "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") @@ -803,35 +804,61 @@ class RemoteServerHandler( server.notifyListeners(RemoteServerError(e, server)) throw e } + } - private def createTypedActor(actorInfo: ActorInfoProtocol, channel: Channel): AnyRef = { + /** + * Creates a new instance of the actor with name, uuid and timeout specified as arguments. + * + * If actor already created then just return it from the registry. + * + * Does not start the actor. + */ + private def createActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = { val uuid = actorInfo.getUuid val id = actorInfo.getId - val typedActorOrNull = findTypedActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) - if (typedActorOrNull ne null) - return typedActorOrNull; + val actorRefOrNull = findActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) - // the actor has not been registered globally. See if we have it in the session + if (actorRefOrNull ne null) + actorRefOrNull + else + { + // the actor has not been registered globally. See if we have it in the session + val sessionActorRefOrNull = createSessionActor(actorInfo, channel); + if (sessionActorRefOrNull ne null) + sessionActorRefOrNull + else // maybe it is a client managed actor + createClientManagedActor(actorInfo) + } + } + /** + * gets the actor from the session, or creates one if there is a factory for it + */ + private def createTypedSessionActor(actorInfo: ActorInfoProtocol, channel: Channel):AnyRef ={ + val id = actorInfo.getId val sessionActorRefOrNull = findTypedSessionActor(id, channel); if (sessionActorRefOrNull ne null) - return sessionActorRefOrNull - - // we dont have it in the session either, see if we have a factory for it - val actorFactoryOrNull = findTypedActorFactory(id) - if (actorFactoryOrNull ne null) { - val newInstance = actorFactoryOrNull(); - typedSessionActors.get(channel).put(id, newInstance); - return newInstance + sessionActorRefOrNull + else { + val actorFactoryOrNull = findTypedActorFactory(id) + if (actorFactoryOrNull ne null) { + val newInstance = actorFactoryOrNull(); + typedSessionActors.get(channel).put(id, newInstance); + newInstance + } + else + null } - // None of the above, so treat it as a client managed remote actor + } + private def createClientManagedTypedActor(actorInfo: ActorInfoProtocol) = { val typedActorInfo = actorInfo.getTypedActorInfo val interfaceClassname = typedActorInfo.getInterface val targetClassname = actorInfo.getTarget + val uuid = actorInfo.getUuid try { if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException( @@ -856,6 +883,24 @@ class RemoteServerHandler( } } + private def createTypedActor(actorInfo: ActorInfoProtocol, channel: Channel): AnyRef = { + val uuid = actorInfo.getUuid + val id = actorInfo.getId + + val typedActorOrNull = findTypedActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) + if (typedActorOrNull ne null) + typedActorOrNull + else + { + // the actor has not been registered globally. See if we have it in the session + val sessionActorRefOrNull = createTypedSessionActor(actorInfo, channel) + if (sessionActorRefOrNull ne null) + sessionActorRefOrNull + else // maybe it is a client managed actor + createClientManagedTypedActor(actorInfo) + } + } + private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol, actorType: AkkaActorType): RemoteMessageProtocol = { val actorInfo = request.getActorInfo log.error(exception, "Could not invoke remote actor [%s]", actorInfo.getTarget)