Merge with master
This commit is contained in:
commit
d4d0fe3338
5 changed files with 62 additions and 45 deletions
|
|
@ -71,4 +71,4 @@ class AkkaBroadcaster extends org.atmosphere.util.SimpleBroadcaster with Logging
|
|||
log.info("broadcast(r,e) called")
|
||||
actor send AkkaBroadcast(r,e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -16,10 +16,10 @@ import se.scalablesolutions.akka.Config._
|
|||
trait BootableActorLoaderService extends Bootable with Logging {
|
||||
|
||||
val BOOT_CLASSES = config.getList("akka.boot")
|
||||
var applicationLoader: Option[ClassLoader] = None
|
||||
lazy val applicationLoader: Option[ClassLoader] = createApplicationClassLoader
|
||||
|
||||
protected def runApplicationBootClasses : Option[ClassLoader] = {
|
||||
val loader =
|
||||
protected def createApplicationClassLoader : Option[ClassLoader] = {
|
||||
Some(
|
||||
if (HOME.isDefined) {
|
||||
val CONFIG = HOME.get + "/config"
|
||||
val DEPLOY = HOME.get + "/deploy"
|
||||
|
|
@ -35,16 +35,18 @@ trait BootableActorLoaderService extends Bootable with Logging {
|
|||
getClass.getClassLoader
|
||||
} else throw new IllegalStateException(
|
||||
"AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
|
||||
for (clazz <- BOOT_CLASSES) {
|
||||
)
|
||||
}
|
||||
|
||||
abstract override def onLoad = {
|
||||
for (loader <- applicationLoader;
|
||||
clazz <- BOOT_CLASSES)
|
||||
{
|
||||
log.info("Loading boot class [%s]", clazz)
|
||||
loader.loadClass(clazz).newInstance
|
||||
}
|
||||
Some(loader)
|
||||
}
|
||||
|
||||
abstract override def onLoad = {
|
||||
applicationLoader = runApplicationBootClasses
|
||||
super.onLoad
|
||||
super.onLoad
|
||||
}
|
||||
|
||||
abstract override def onUnload = {
|
||||
|
|
|
|||
|
|
@ -23,12 +23,17 @@ trait BootableRemoteActorService extends Bootable with Logging {
|
|||
def startRemoteService = remoteServerThread.start
|
||||
|
||||
abstract override def onLoad = {
|
||||
super.onLoad //Make sure the actors facility is loaded before we load the remote service
|
||||
if(config.getBool("akka.remote.server.service", true)){
|
||||
log.info("Starting up Cluster Service")
|
||||
Cluster.start
|
||||
super.onLoad //Initialize BootableActorLoaderService before remote service
|
||||
log.info("Initializing Remote Actors Service...")
|
||||
startRemoteService
|
||||
log.info("Remote Actors Service initialized!")
|
||||
}
|
||||
else
|
||||
super.onLoad
|
||||
|
||||
}
|
||||
|
||||
abstract override def onUnload = {
|
||||
|
|
@ -36,9 +41,8 @@ trait BootableRemoteActorService extends Bootable with Logging {
|
|||
if (remoteServerThread.isAlive) {
|
||||
log.info("Shutting down Remote Actors Service")
|
||||
RemoteNode.shutdown
|
||||
log.info("Shutting down Cluster Service")
|
||||
Cluster.shutdown
|
||||
remoteServerThread.join(1000)
|
||||
}
|
||||
Cluster.shutdown
|
||||
}
|
||||
}
|
||||
|
|
@ -199,40 +199,40 @@ abstract class BasicClusterActor extends ClusterActor {
|
|||
* Loads a specified ClusterActor and delegates to that instance.
|
||||
*/
|
||||
object Cluster extends Cluster with Logging {
|
||||
|
||||
private[remote] val actorName = config.getString("akka.remote.cluster.actor")
|
||||
|
||||
private[remote] val clusterActor = createClusterActor
|
||||
|
||||
private[remote] def createClusterActor : Option[ClusterActor] = {
|
||||
try {
|
||||
actorName map { s =>
|
||||
val a = Class.forName(s).newInstance.asInstanceOf[ClusterActor]
|
||||
a.start
|
||||
a
|
||||
}
|
||||
}
|
||||
catch {
|
||||
case e => log.error(e,"Couldn't load Cluster provider: [%s]",actorName.getOrElse(""))
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
private[remote] val supervisor: Option[Supervisor] = if (clusterActor.isDefined) {
|
||||
val sup = SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])),
|
||||
Supervise(clusterActor.get, LifeCycle(Permanent)) :: Nil)
|
||||
).newInstance
|
||||
sup.start
|
||||
Some(sup)
|
||||
} else None
|
||||
@volatile private[remote] var clusterActor: Option[ClusterActor] = None
|
||||
@volatile private[remote] var supervisor: Option[Supervisor] = None
|
||||
|
||||
private[remote] lazy val serializer: Serializer = {
|
||||
val className = config.getString("akka.remote.cluster.serializer", Serializer.Java.getClass.getName)
|
||||
Class.forName(className).newInstance.asInstanceOf[Serializer]
|
||||
}
|
||||
|
||||
private[remote] def createClusterActor : Option[ClusterActor] = {
|
||||
val name = config.getString("akka.remote.cluster.actor")
|
||||
|
||||
try {
|
||||
name map { fqn =>
|
||||
val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor]
|
||||
a.start
|
||||
a
|
||||
}
|
||||
}
|
||||
catch {
|
||||
case e => log.error(e,"Couldn't load Cluster provider: [%s]",name.getOrElse("Not specified")); None
|
||||
}
|
||||
}
|
||||
|
||||
private[remote] def createSupervisor(actor : ClusterActor) : Option[Supervisor] = {
|
||||
val sup = SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])),
|
||||
Supervise(actor, LifeCycle(Permanent)) :: Nil)
|
||||
).newInstance
|
||||
sup.start
|
||||
Some(sup)
|
||||
}
|
||||
|
||||
|
||||
def name = clusterActor.map(_.name).getOrElse("No cluster")
|
||||
|
||||
def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T] = clusterActor.flatMap(_.lookup(pf))
|
||||
|
|
@ -243,5 +243,19 @@ object Cluster extends Cluster with Logging {
|
|||
|
||||
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActor.foreach(_.relayMessage(to, msg))
|
||||
|
||||
def shutdown = supervisor.foreach(_.stop)
|
||||
def start : Unit = synchronized {
|
||||
if(supervisor.isEmpty) {
|
||||
for(actor <- createClusterActor;
|
||||
sup <- createSupervisor(actor)) {
|
||||
clusterActor = Some(actor)
|
||||
supervisor = Some(sup)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def shutdown : Unit = synchronized {
|
||||
supervisor.foreach(_.stop)
|
||||
supervisor = None
|
||||
clusterActor = None
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -112,8 +112,6 @@ object RemoteServer {
|
|||
|
||||
private[remote] def unregister(hostname: String, port: Int) =
|
||||
remoteServers.remove(Address(hostname, port))
|
||||
|
||||
private[remote] def canShutDownCluster: Boolean = remoteServers.isEmpty
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -186,7 +184,6 @@ class RemoteServer extends Logging {
|
|||
openChannels.close.awaitUninterruptibly(1000)
|
||||
bootstrap.releaseExternalResources
|
||||
Cluster.deregisterLocalNode(hostname, port)
|
||||
if (RemoteServer.canShutDownCluster) Cluster.shutdown
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue