Added server initiated remote untyped session actors
now you can register a factory function and whenever a new session starts, the actor will be created and started. When the client disconnects, the actor will be stopped. The client works the same as any other untyped remote server managed actor.
use like this:
RemoteServer.registerPerSession("actor-name", actorOf[MyActor])
Unregister like this:
RemoteServer.unregisterPerSession("actor-name")
This commit is contained in:
parent
26f23ac5ec
commit
376d1c9dfb
2 changed files with 138 additions and 34 deletions
|
|
@ -458,7 +458,7 @@ class RemoteServerHandler(
|
|||
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
|
||||
val CHANNEL_INIT = "channel-init".intern
|
||||
|
||||
val sessionActors = new ChannelLocal();
|
||||
val sessionActors = new ChannelLocal[Map[String, ActorRef]]();
|
||||
|
||||
applicationLoader.foreach(MessageSerializer.setClassLoader(_))
|
||||
|
||||
|
|
@ -470,6 +470,7 @@ class RemoteServerHandler(
|
|||
|
||||
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
val clientAddress = getClientAddress(ctx)
|
||||
sessionActors.set(event.getChannel(), Map[String, ActorRef]());
|
||||
log.debug("Remote client [%s] connected to [%s]", clientAddress, server.name)
|
||||
if (RemoteServer.SECURE) {
|
||||
val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler])
|
||||
|
|
@ -489,6 +490,11 @@ class RemoteServerHandler(
|
|||
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
val clientAddress = getClientAddress(ctx)
|
||||
log.debug("Remote client [%s] disconnected from [%s]", clientAddress, server.name)
|
||||
// stop all session actors
|
||||
for ((id, actorRef) <- sessionActors.get(event.getChannel())) {
|
||||
actorRef.stop()
|
||||
}
|
||||
sessionActors.remove(event.getChannel());
|
||||
server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress))
|
||||
}
|
||||
|
||||
|
|
@ -543,7 +549,7 @@ class RemoteServerHandler(
|
|||
|
||||
val actorRef =
|
||||
try {
|
||||
createActor(actorInfo).start
|
||||
createActor(actorInfo, channel).start
|
||||
} catch {
|
||||
case e: SecurityException =>
|
||||
channel.write(createErrorReplyMessage(e, request, AkkaActorType.ScalaActor))
|
||||
|
|
@ -664,6 +670,13 @@ class RemoteServerHandler(
|
|||
server.actorsByUuid.get(uuid)
|
||||
}
|
||||
|
||||
private def findActorFactory(id: String) : () => ActorRef = {
|
||||
server.actorsFactories.get(id)
|
||||
}
|
||||
private def findSessionActor(id: String, channel: Channel) : ActorRef = {
|
||||
sessionActors.get(channel).getOrElse(id, null)
|
||||
}
|
||||
|
||||
private def findTypedActorById(id: String) : AnyRef = {
|
||||
server.typedActors.get(id)
|
||||
}
|
||||
|
|
@ -693,7 +706,7 @@ class RemoteServerHandler(
|
|||
*
|
||||
* Does not start the actor.
|
||||
*/
|
||||
private def createActor(actorInfo: ActorInfoProtocol): ActorRef = {
|
||||
private def createActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = {
|
||||
val uuid = actorInfo.getUuid
|
||||
val id = actorInfo.getId
|
||||
|
||||
|
|
@ -702,28 +715,47 @@ class RemoteServerHandler(
|
|||
|
||||
val actorRefOrNull = findActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString)
|
||||
|
||||
if (actorRefOrNull eq null) {
|
||||
try {
|
||||
if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException(
|
||||
"Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client")
|
||||
if (actorRefOrNull ne null)
|
||||
return actorRefOrNull
|
||||
|
||||
log.info("Creating a new remote actor [%s:%s]", name, uuid)
|
||||
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
|
||||
else Class.forName(name)
|
||||
val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]])
|
||||
actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow)
|
||||
actorRef.id = id
|
||||
actorRef.timeout = timeout
|
||||
actorRef.remoteAddress = None
|
||||
server.actorsByUuid.put(actorRef.uuid.toString, actorRef) // register by uuid
|
||||
actorRef
|
||||
} catch {
|
||||
case e =>
|
||||
log.error(e, "Could not create remote actor instance")
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
throw e
|
||||
}
|
||||
} else actorRefOrNull
|
||||
|
||||
// the actor has not been registered globally. See if we have it in the session
|
||||
|
||||
val sessionActorRefOrNull = findSessionActor(id, channel);
|
||||
if (sessionActorRefOrNull ne null)
|
||||
return sessionActorRefOrNull
|
||||
|
||||
// we dont have it in the session either, see if we have a factory for it
|
||||
val actorFactoryOrNull = findActorFactory(id)
|
||||
if (actorFactoryOrNull ne null) {
|
||||
val actorRef = actorFactoryOrNull();
|
||||
actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow)
|
||||
sessionActors.get(channel).put(id, actorRef);
|
||||
server.actorsByUuid.put(actorRef.uuid.toString, actorRef) // register by uuid
|
||||
return actorRef
|
||||
}
|
||||
|
||||
// None of the above, so treat it as a client managed remote actor
|
||||
try {
|
||||
if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException(
|
||||
"Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client")
|
||||
|
||||
log.info("Creating a new remote actor [%s:%s]", name, uuid)
|
||||
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
|
||||
else Class.forName(name)
|
||||
val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]])
|
||||
actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow)
|
||||
actorRef.id = id
|
||||
actorRef.timeout = timeout
|
||||
actorRef.remoteAddress = None
|
||||
server.actorsByUuid.put(actorRef.uuid.toString, actorRef) // register by uuid
|
||||
actorRef
|
||||
} catch {
|
||||
case e =>
|
||||
log.error(e, "Could not create remote actor instance")
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue