fix: server initiated remote actors not found

removed fixme comment
This commit is contained in:
Michael Kober 2010-09-06 09:35:02 +02:00
parent 3d433304f1
commit 2478e5d6ad
2 changed files with 33 additions and 13 deletions

View file

@ -234,9 +234,8 @@ class RemoteServer extends Logging with ListenerManagement {
port = _port
log.info("Starting remote server at [%s:%s]", hostname, port)
RemoteServer.register(hostname, port, this)
val remoteActorSet = RemoteServer.actorsFor(RemoteServer.Address(hostname, port))
val pipelineFactory = new RemoteServerPipelineFactory(
name, openChannels, loader, remoteActorSet.actors, remoteActorSet.typedActors,this)
name, openChannels, loader, actors, typedActors, this)
bootstrap.setPipelineFactory(pipelineFactory)
bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.keepAlive", true)
@ -324,6 +323,13 @@ class RemoteServer extends Logging with ListenerManagement {
protected override def manageLifeCycleOfListeners = false
protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f)
private def actors() : ConcurrentHashMap[String, ActorRef] = {
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors
}
private def typedActors() : ConcurrentHashMap[String, AnyRef] = {
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors
}
}
object RemoteServerSslContext {
@ -348,8 +354,8 @@ class RemoteServerPipelineFactory(
val name: String,
val openChannels: ChannelGroup,
val loader: Option[ClassLoader],
val actors: JMap[String, ActorRef],
val typedActors: JMap[String, AnyRef],
val actors: (() => ConcurrentHashMap[String, ActorRef]),
val typedActors: (() => ConcurrentHashMap[String, AnyRef]),
val server: RemoteServer) extends ChannelPipelineFactory {
import RemoteServer._
@ -373,7 +379,7 @@ class RemoteServerPipelineFactory(
case _ => (join(), join())
}
val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, typedActors,server)
val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, typedActors, server)
val stages = ssl ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteServer)
new StaticChannelPipeline(stages: _*)
}
@ -387,8 +393,8 @@ class RemoteServerHandler(
val name: String,
val openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader],
val actors: JMap[String, ActorRef],
val typedActors: JMap[String, AnyRef],
val actors: (() => ConcurrentHashMap[String, ActorRef]),
val typedActors: (() => ConcurrentHashMap[String, AnyRef]),
val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging {
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
@ -539,7 +545,8 @@ class RemoteServerHandler(
val name = actorInfo.getTarget
val timeout = actorInfo.getTimeout
val actorRefOrNull = actors get uuid
val registeredActors = actors()
val actorRefOrNull = registeredActors get uuid
if (actorRefOrNull eq null) {
try {
@ -550,7 +557,7 @@ class RemoteServerHandler(
actorRef.uuid = uuid
actorRef.timeout = timeout
actorRef.remoteAddress = None
actors.put(uuid, actorRef)
registeredActors.put(uuid, actorRef)
actorRef
} catch {
case e =>
@ -563,7 +570,8 @@ class RemoteServerHandler(
private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = {
val uuid = actorInfo.getUuid
val typedActorOrNull = typedActors get uuid
val registeredTypedActors = typedActors()
val typedActorOrNull = registeredTypedActors get uuid
if (typedActorOrNull eq null) {
val typedActorInfo = actorInfo.getTypedActorInfo
@ -580,7 +588,7 @@ class RemoteServerHandler(
val newInstance = TypedActor.newInstance(
interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef]
typedActors.put(uuid, newInstance)
registeredTypedActors.put(uuid, newInstance)
newInstance
} catch {
case e =>