Should do the trick
This commit is contained in:
parent
14579aaaab
commit
e33cf4ddd8
3 changed files with 30 additions and 33 deletions
|
|
@ -24,8 +24,8 @@ trait BootableRemoteActorService extends Bootable with Logging {
|
|||
|
||||
abstract override def onLoad = {
|
||||
if(config.getBool("akka.remote.server.service", true)){
|
||||
Cluster.start
|
||||
super.onLoad //Initialize BootableActorLoaderService before remote service
|
||||
Cluster.start(self.applicationLoader)
|
||||
log.info("Initializing Remote Actors Service...")
|
||||
startRemoteService
|
||||
log.info("Remote Actors Service initialized!")
|
||||
|
|
|
|||
|
|
@ -37,6 +37,10 @@ trait Cluster {
|
|||
*/
|
||||
trait ClusterActor extends Actor with Cluster {
|
||||
val name = config.getString("akka.remote.cluster.name") getOrElse "default"
|
||||
|
||||
@volatile protected var serializer : Serializer = _
|
||||
|
||||
private[remote] def setSerializer(s : Serializer) : Unit = serializer = s
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -110,7 +114,7 @@ abstract class BasicClusterActor extends ClusterActor {
|
|||
|
||||
case m: Message[ADDR_T] => {
|
||||
val (src, msg) = (m.sender, m.msg)
|
||||
(Cluster.serializer in (msg, None)) match {
|
||||
(serializer in (msg, None)) match {
|
||||
|
||||
case PapersPlease => {
|
||||
log debug ("Asked for papers by %s", src)
|
||||
|
|
@ -156,7 +160,7 @@ abstract class BasicClusterActor extends ClusterActor {
|
|||
* that's been set in the akka-conf
|
||||
*/
|
||||
protected def broadcast[T <: AnyRef](recipients: Iterable[ADDR_T], msg: T): Unit = {
|
||||
lazy val m = Cluster.serializer out msg
|
||||
lazy val m = serializer out msg
|
||||
for (r <- recipients) toOneNode(r, m)
|
||||
}
|
||||
|
||||
|
|
@ -165,7 +169,7 @@ abstract class BasicClusterActor extends ClusterActor {
|
|||
* that's been set in the akka-conf
|
||||
*/
|
||||
protected def broadcast[T <: AnyRef](msg: T): Unit =
|
||||
if (!remotes.isEmpty) toAllNodes(Cluster.serializer out msg)
|
||||
if (!remotes.isEmpty) toAllNodes(serializer out msg)
|
||||
|
||||
/**
|
||||
* Applies the given PartialFunction to all known RemoteAddresses
|
||||
|
|
@ -205,23 +209,21 @@ abstract class BasicClusterActor extends ClusterActor {
|
|||
object Cluster extends Cluster with Logging {
|
||||
lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName
|
||||
|
||||
@volatile private[remote] var clusterActor: Option[ClusterActor] = None
|
||||
@volatile private[remote] var clusterActor: Option[ClusterActor] = None
|
||||
|
||||
// FIXME Use the supervisor member field
|
||||
@volatile private[remote] var supervisor: Option[Supervisor] = None
|
||||
|
||||
private[remote] lazy val serializer: Serializer =
|
||||
Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME))
|
||||
.newInstance.asInstanceOf[Serializer]
|
||||
|
||||
private[remote] def createClusterActor: Option[ClusterActor] = {
|
||||
private[remote] def createClusterActor(loader : ClassLoader): Option[ClusterActor] = {
|
||||
val name = config.getString("akka.remote.cluster.actor")
|
||||
if (name.isEmpty) throw new IllegalArgumentException(
|
||||
"Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined")
|
||||
|
||||
val serializer = Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)).newInstance.asInstanceOf[Serializer]
|
||||
serializer setClassLoader loader
|
||||
try {
|
||||
name map {
|
||||
fqn =>
|
||||
Class.forName(fqn).newInstance.asInstanceOf[ClusterActor]
|
||||
val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor]
|
||||
a setSerializer serializer
|
||||
a
|
||||
}
|
||||
}
|
||||
catch {
|
||||
|
|
@ -251,13 +253,14 @@ object Cluster extends Cluster with Logging {
|
|||
|
||||
def foreach(f: (RemoteAddress) => Unit): Unit = clusterActor.foreach(_.foreach(f))
|
||||
|
||||
def start: Unit = synchronized {
|
||||
def start: Unit = start(None)
|
||||
|
||||
def start(serializerClassLoader : Option[ClassLoader]): Unit = synchronized {
|
||||
log.info("Starting up Cluster Service...")
|
||||
if (supervisor.isEmpty) {
|
||||
for (actor <- createClusterActor;
|
||||
sup <- createSupervisor(actor)) {
|
||||
if (clusterActor.isEmpty) {
|
||||
for{ actor <- createClusterActor(serializerClassLoader getOrElse getClass.getClassLoader)
|
||||
sup <- createSupervisor(actor) } {
|
||||
clusterActor = Some(actor)
|
||||
supervisor = Some(sup)
|
||||
sup.start
|
||||
}
|
||||
}
|
||||
|
|
@ -265,8 +268,10 @@ object Cluster extends Cluster with Logging {
|
|||
|
||||
def shutdown: Unit = synchronized {
|
||||
log.info("Shutting down Cluster Service...")
|
||||
supervisor.foreach(_.stop)
|
||||
supervisor = None
|
||||
for{
|
||||
c <- clusterActor
|
||||
s <- c._supervisor
|
||||
} s.stop
|
||||
clusterActor = None
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,10 @@ trait Serializer {
|
|||
def deepClone(obj: AnyRef): AnyRef
|
||||
def out(obj: AnyRef): Array[Byte]
|
||||
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
|
||||
|
||||
protected var classLoader: Option[ClassLoader] = None
|
||||
|
||||
def setClassLoader(cl: ClassLoader) = classLoader = Some(cl)
|
||||
}
|
||||
|
||||
// For Java API
|
||||
|
|
@ -52,10 +56,6 @@ object Serializer {
|
|||
*/
|
||||
object Java extends Java
|
||||
class Java extends Serializer {
|
||||
private var classLoader: Option[ClassLoader] = None
|
||||
|
||||
def setClassLoader(cl: ClassLoader) = classLoader = Some(cl)
|
||||
|
||||
def deepClone(obj: AnyRef): AnyRef = in(out(obj), None)
|
||||
|
||||
def out(obj: AnyRef): Array[Byte] = {
|
||||
|
|
@ -107,10 +107,6 @@ object Serializer {
|
|||
class JavaJSON extends Serializer {
|
||||
private val mapper = new ObjectMapper
|
||||
|
||||
private var classLoader: Option[ClassLoader] = None
|
||||
|
||||
def setClassLoader(cl: ClassLoader) = classLoader = Some(cl)
|
||||
|
||||
def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass))
|
||||
|
||||
def out(obj: AnyRef): Array[Byte] = {
|
||||
|
|
@ -143,10 +139,6 @@ object Serializer {
|
|||
class ScalaJSON extends Serializer {
|
||||
def deepClone(obj: AnyRef): AnyRef = in(out(obj), None)
|
||||
|
||||
private var classLoader: Option[ClassLoader] = None
|
||||
|
||||
def setClassLoader(cl: ClassLoader) = classLoader = Some(cl)
|
||||
|
||||
def out(obj: AnyRef): Array[Byte] = SJSONSerializer.SJSON.out(obj)
|
||||
|
||||
// FIXME set ClassLoader on SJSONSerializer.SJSON
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue