Adding lifecycle messages and listenability to RemoteServer
This commit is contained in:
parent
4ba859cb95
commit
7d289dfb31
3 changed files with 76 additions and 34 deletions
|
|
@ -26,6 +26,7 @@ import org.jboss.netty.handler.ssl.SslHandler
|
|||
|
||||
|
||||
import scala.collection.mutable.Map
|
||||
import reflect.BeanProperty
|
||||
|
||||
/**
|
||||
* Use this object if you need a single remote server on a specific node.
|
||||
|
|
@ -160,6 +161,28 @@ object RemoteServer {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Life-cycle events for RemoteServer.
|
||||
*/
|
||||
sealed trait RemoteServerLifeCycleEvent
|
||||
case class RemoteServerError(
|
||||
@BeanProperty val cause: Throwable,
|
||||
@BeanProperty val host: String,
|
||||
@BeanProperty val port: Int) extends RemoteServerLifeCycleEvent
|
||||
case class RemoteServerShutdown(
|
||||
@BeanProperty val host: String,
|
||||
@BeanProperty val port: Int) extends RemoteServerLifeCycleEvent
|
||||
case class RemoteServerStarted(
|
||||
@BeanProperty val host: String,
|
||||
@BeanProperty val port: Int) extends RemoteServerLifeCycleEvent
|
||||
/*FIXME NOT SUPPORTED YET
|
||||
case class RemoteServerClientConnected(
|
||||
@BeanProperty val host: String,
|
||||
@BeanProperty val port: Int) extends RemoteServerLifeCycleEvent
|
||||
case class RemoteServerClientDisconnected(
|
||||
@BeanProperty val host: String,
|
||||
@BeanProperty val port: Int) extends RemoteServerLifeCycleEvent*/
|
||||
|
||||
/**
|
||||
* Use this class if you need a more than one remote server on a specific node.
|
||||
*
|
||||
|
|
@ -176,11 +199,11 @@ object RemoteServer {
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class RemoteServer extends Logging {
|
||||
class RemoteServer extends Logging with ListenerManagement {
|
||||
val name = "RemoteServer@" + hostname + ":" + port
|
||||
|
||||
private var hostname = RemoteServer.HOSTNAME
|
||||
private var port = RemoteServer.PORT
|
||||
private[akka] var hostname = RemoteServer.HOSTNAME
|
||||
private[akka] var port = RemoteServer.PORT
|
||||
|
||||
@volatile private var _isRunning = false
|
||||
|
||||
|
|
@ -222,7 +245,7 @@ class RemoteServer extends Logging {
|
|||
RemoteServer.register(hostname, port, this)
|
||||
val remoteActorSet = RemoteServer.actorsFor(RemoteServer.Address(hostname, port))
|
||||
val pipelineFactory = new RemoteServerPipelineFactory(
|
||||
name, openChannels, loader, remoteActorSet.actors, remoteActorSet.typedActors)
|
||||
name, openChannels, loader, remoteActorSet.actors, remoteActorSet.typedActors,this)
|
||||
bootstrap.setPipelineFactory(pipelineFactory)
|
||||
bootstrap.setOption("child.tcpNoDelay", true)
|
||||
bootstrap.setOption("child.keepAlive", true)
|
||||
|
|
@ -231,9 +254,12 @@ class RemoteServer extends Logging {
|
|||
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
|
||||
_isRunning = true
|
||||
Cluster.registerLocalNode(hostname, port)
|
||||
foreachListener(_ ! RemoteServerStarted(hostname,port))
|
||||
}
|
||||
} catch {
|
||||
case e => log.error(e, "Could not start up remote server")
|
||||
case e =>
|
||||
log.error(e, "Could not start up remote server")
|
||||
foreachListener(_ ! RemoteServerError(e,hostname,port))
|
||||
}
|
||||
this
|
||||
}
|
||||
|
|
@ -246,6 +272,7 @@ class RemoteServer extends Logging {
|
|||
openChannels.close.awaitUninterruptibly
|
||||
bootstrap.releaseExternalResources
|
||||
Cluster.deregisterLocalNode(hostname, port)
|
||||
foreachListener(_ ! RemoteServerShutdown(hostname,port))
|
||||
} catch {
|
||||
case e: java.nio.channels.ClosedChannelException => {}
|
||||
case e => log.warning("Could not close remote server channel in a graceful way")
|
||||
|
|
@ -302,6 +329,8 @@ class RemoteServer extends Logging {
|
|||
if (actorRef.registeredInRemoteNodeDuringSerialization) actors.remove(actorRef.uuid)
|
||||
}
|
||||
}
|
||||
|
||||
protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f)
|
||||
}
|
||||
|
||||
object RemoteServerSslContext {
|
||||
|
|
@ -327,7 +356,8 @@ class RemoteServerPipelineFactory(
|
|||
val openChannels: ChannelGroup,
|
||||
val loader: Option[ClassLoader],
|
||||
val actors: JMap[String, ActorRef],
|
||||
val typedActors: JMap[String, AnyRef]) extends ChannelPipelineFactory {
|
||||
val typedActors: JMap[String, AnyRef],
|
||||
val server: RemoteServer) extends ChannelPipelineFactory {
|
||||
import RemoteServer._
|
||||
|
||||
def getPipeline: ChannelPipeline = {
|
||||
|
|
@ -347,7 +377,7 @@ class RemoteServerPipelineFactory(
|
|||
case _ => (join(), join())
|
||||
}
|
||||
|
||||
val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, typedActors)
|
||||
val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, typedActors,server)
|
||||
val stages = ssl ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteServer)
|
||||
new StaticChannelPipeline(stages: _*)
|
||||
}
|
||||
|
|
@ -362,7 +392,8 @@ class RemoteServerHandler(
|
|||
val openChannels: ChannelGroup,
|
||||
val applicationLoader: Option[ClassLoader],
|
||||
val actors: JMap[String, ActorRef],
|
||||
val typedActors: JMap[String, AnyRef]) extends SimpleChannelUpstreamHandler with Logging {
|
||||
val typedActors: JMap[String, AnyRef],
|
||||
val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging {
|
||||
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
|
||||
|
||||
applicationLoader.foreach(MessageSerializer.setClassLoader(_))
|
||||
|
|
@ -406,9 +437,9 @@ class RemoteServerHandler(
|
|||
}
|
||||
|
||||
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
|
||||
event.getCause.printStackTrace
|
||||
log.error(event.getCause, "Unexpected exception from remote downstream")
|
||||
event.getChannel.close
|
||||
server.foreachListener(_ ! RemoteServerError(event.getCause,server.hostname,server.port))
|
||||
}
|
||||
|
||||
private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = {
|
||||
|
|
@ -424,8 +455,7 @@ class RemoteServerHandler(
|
|||
val actorInfo = request.getActorInfo
|
||||
log.debug("Dispatching to remote actor [%s:%s]", actorInfo.getTarget, actorInfo.getUuid)
|
||||
|
||||
val actorRef = createActor(actorInfo)
|
||||
actorRef.start
|
||||
val actorRef = createActor(actorInfo).start
|
||||
|
||||
val message = MessageSerializer.deserialize(request.getMessage)
|
||||
val sender =
|
||||
|
|
@ -452,7 +482,9 @@ class RemoteServerHandler(
|
|||
channel.write(replyBuilder.build)
|
||||
|
||||
} catch {
|
||||
case e: Throwable => channel.write(createErrorReplyMessage(e, request, true))
|
||||
case e: Throwable =>
|
||||
channel.write(createErrorReplyMessage(e, request, true))
|
||||
server.foreachListener(_ ! RemoteServerError(e,server.hostname,server.port))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -482,8 +514,12 @@ class RemoteServerHandler(
|
|||
channel.write(replyBuilder.build)
|
||||
}
|
||||
} catch {
|
||||
case e: InvocationTargetException => channel.write(createErrorReplyMessage(e.getCause, request, false))
|
||||
case e: Throwable => channel.write(createErrorReplyMessage(e, request, false))
|
||||
case e: InvocationTargetException =>
|
||||
channel.write(createErrorReplyMessage(e.getCause, request, false))
|
||||
server.foreachListener(_ ! RemoteServerError(e,server.hostname,server.port))
|
||||
case e: Throwable =>
|
||||
channel.write(createErrorReplyMessage(e, request, false))
|
||||
server.foreachListener(_ ! RemoteServerError(e,server.hostname,server.port))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -515,6 +551,7 @@ class RemoteServerHandler(
|
|||
} catch {
|
||||
case e =>
|
||||
log.error(e, "Could not create remote actor instance")
|
||||
server.foreachListener(_ ! RemoteServerError(e,server.hostname,server.port))
|
||||
throw e
|
||||
}
|
||||
} else actorRefOrNull
|
||||
|
|
@ -522,7 +559,7 @@ class RemoteServerHandler(
|
|||
|
||||
private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = {
|
||||
val uuid = actorInfo.getUuid
|
||||
val typedActorOrNull = typedActors.get(uuid)
|
||||
val typedActorOrNull = typedActors get uuid
|
||||
|
||||
if (typedActorOrNull eq null) {
|
||||
val typedActorInfo = actorInfo.getTypedActorInfo
|
||||
|
|
@ -542,7 +579,10 @@ class RemoteServerHandler(
|
|||
typedActors.put(uuid, newInstance)
|
||||
newInstance
|
||||
} catch {
|
||||
case e => log.error(e, "Could not create remote typed actor instance"); throw e
|
||||
case e =>
|
||||
log.error(e, "Could not create remote typed actor instance")
|
||||
server.foreachListener(_ ! RemoteServerError(e,server.hostname,server.port))
|
||||
throw e
|
||||
}
|
||||
} else typedActorOrNull
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue