merged with upstream
This commit is contained in:
commit
e519d86ffa
9 changed files with 148 additions and 44 deletions
|
|
@ -30,7 +30,7 @@ trait BootableActorLoaderService extends Bootable with Logging {
|
||||||
}
|
}
|
||||||
val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL
|
val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL
|
||||||
log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList)
|
log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList)
|
||||||
new URLClassLoader(toDeploy.toArray, ClassLoader.getSystemClassLoader)
|
new URLClassLoader(toDeploy.toArray, getClass.getClassLoader)
|
||||||
} else getClass.getClassLoader)
|
} else getClass.getClassLoader)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -23,21 +23,19 @@ trait BootableRemoteActorService extends Bootable with Logging {
|
||||||
def startRemoteService = remoteServerThread.start
|
def startRemoteService = remoteServerThread.start
|
||||||
|
|
||||||
abstract override def onLoad = {
|
abstract override def onLoad = {
|
||||||
if(config.getBool("akka.remote.server.service", true)){
|
|
||||||
Cluster.start
|
|
||||||
super.onLoad //Initialize BootableActorLoaderService before remote service
|
super.onLoad //Initialize BootableActorLoaderService before remote service
|
||||||
|
if(config.getBool("akka.remote.server.service", true)){
|
||||||
|
|
||||||
|
if(config.getBool("akka.remote.cluster.service", true))
|
||||||
|
Cluster.start(self.applicationLoader)
|
||||||
|
|
||||||
log.info("Initializing Remote Actors Service...")
|
log.info("Initializing Remote Actors Service...")
|
||||||
startRemoteService
|
startRemoteService
|
||||||
log.info("Remote Actors Service initialized!")
|
log.info("Remote Actors Service initialized!")
|
||||||
}
|
}
|
||||||
else
|
|
||||||
super.onLoad
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract override def onUnload = {
|
abstract override def onUnload = {
|
||||||
super.onUnload
|
|
||||||
|
|
||||||
log.info("Shutting down Remote Actors Service")
|
log.info("Shutting down Remote Actors Service")
|
||||||
|
|
||||||
RemoteNode.shutdown
|
RemoteNode.shutdown
|
||||||
|
|
@ -49,6 +47,8 @@ trait BootableRemoteActorService extends Bootable with Logging {
|
||||||
Cluster.shutdown
|
Cluster.shutdown
|
||||||
|
|
||||||
log.info("Remote Actors Service has been shut down")
|
log.info("Remote Actors Service has been shut down")
|
||||||
|
|
||||||
|
super.onUnload
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -17,16 +17,41 @@ import scala.collection.immutable.{Map, HashMap}
|
||||||
* @author Viktor Klang
|
* @author Viktor Klang
|
||||||
*/
|
*/
|
||||||
trait Cluster {
|
trait Cluster {
|
||||||
|
/**
|
||||||
|
* Specifies the cluster name
|
||||||
|
*/
|
||||||
def name: String
|
def name: String
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds the specified hostname + port as a local node
|
||||||
|
* This information will be propagated to other nodes in the cluster
|
||||||
|
* and will be available at the other nodes through lookup and foreach
|
||||||
|
*/
|
||||||
def registerLocalNode(hostname: String, port: Int): Unit
|
def registerLocalNode(hostname: String, port: Int): Unit
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes the specified hostname + port from the local node
|
||||||
|
* This information will be propagated to other nodes in the cluster
|
||||||
|
* and will no longer be available at the other nodes through lookup and foreach
|
||||||
|
*/
|
||||||
def deregisterLocalNode(hostname: String, port: Int): Unit
|
def deregisterLocalNode(hostname: String, port: Int): Unit
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sends the message to all Actors of the specified type on all other nodes in the cluster
|
||||||
|
*/
|
||||||
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit
|
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Traverses all known remote addresses avaiable at all other nodes in the cluster
|
||||||
|
* and applies the given PartialFunction on the first address that it's defined at
|
||||||
|
* The order of application is undefined and may vary
|
||||||
|
*/
|
||||||
def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T]
|
def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Applies the specified function to all known remote addresses on al other nodes in the cluster
|
||||||
|
* The order of application is undefined and may vary
|
||||||
|
*/
|
||||||
def foreach(f: (RemoteAddress) => Unit): Unit
|
def foreach(f: (RemoteAddress) => Unit): Unit
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -37,6 +62,10 @@ trait Cluster {
|
||||||
*/
|
*/
|
||||||
trait ClusterActor extends Actor with Cluster {
|
trait ClusterActor extends Actor with Cluster {
|
||||||
val name = config.getString("akka.remote.cluster.name") getOrElse "default"
|
val name = config.getString("akka.remote.cluster.name") getOrElse "default"
|
||||||
|
|
||||||
|
@volatile protected var serializer : Serializer = _
|
||||||
|
|
||||||
|
private[remote] def setSerializer(s : Serializer) : Unit = serializer = s
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -110,7 +139,7 @@ abstract class BasicClusterActor extends ClusterActor {
|
||||||
|
|
||||||
case m: Message[ADDR_T] => {
|
case m: Message[ADDR_T] => {
|
||||||
val (src, msg) = (m.sender, m.msg)
|
val (src, msg) = (m.sender, m.msg)
|
||||||
(Cluster.serializer in (msg, None)) match {
|
(serializer in (msg, None)) match {
|
||||||
|
|
||||||
case PapersPlease => {
|
case PapersPlease => {
|
||||||
log debug ("Asked for papers by %s", src)
|
log debug ("Asked for papers by %s", src)
|
||||||
|
|
@ -156,7 +185,7 @@ abstract class BasicClusterActor extends ClusterActor {
|
||||||
* that's been set in the akka-conf
|
* that's been set in the akka-conf
|
||||||
*/
|
*/
|
||||||
protected def broadcast[T <: AnyRef](recipients: Iterable[ADDR_T], msg: T): Unit = {
|
protected def broadcast[T <: AnyRef](recipients: Iterable[ADDR_T], msg: T): Unit = {
|
||||||
lazy val m = Cluster.serializer out msg
|
lazy val m = serializer out msg
|
||||||
for (r <- recipients) toOneNode(r, m)
|
for (r <- recipients) toOneNode(r, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -165,7 +194,7 @@ abstract class BasicClusterActor extends ClusterActor {
|
||||||
* that's been set in the akka-conf
|
* that's been set in the akka-conf
|
||||||
*/
|
*/
|
||||||
protected def broadcast[T <: AnyRef](msg: T): Unit =
|
protected def broadcast[T <: AnyRef](msg: T): Unit =
|
||||||
if (!remotes.isEmpty) toAllNodes(Cluster.serializer out msg)
|
if (!remotes.isEmpty) toAllNodes(serializer out msg)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Applies the given PartialFunction to all known RemoteAddresses
|
* Applies the given PartialFunction to all known RemoteAddresses
|
||||||
|
|
@ -205,23 +234,21 @@ abstract class BasicClusterActor extends ClusterActor {
|
||||||
object Cluster extends Cluster with Logging {
|
object Cluster extends Cluster with Logging {
|
||||||
lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName
|
lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName
|
||||||
|
|
||||||
@volatile private[akka] var clusterActor: Option[ClusterActor] = None
|
@volatile private[remote] var clusterActor: Option[ClusterActor] = None
|
||||||
|
|
||||||
// FIXME Use the supervisor member field
|
private[remote] def createClusterActor(loader : ClassLoader): Option[ClusterActor] = {
|
||||||
@volatile private[akka] var supervisor: Option[Supervisor] = None
|
|
||||||
|
|
||||||
private[akka] lazy val serializer: Serializer =
|
|
||||||
Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME))
|
|
||||||
.newInstance.asInstanceOf[Serializer]
|
|
||||||
|
|
||||||
private[akka] def createClusterActor: Option[ClusterActor] = {
|
|
||||||
val name = config.getString("akka.remote.cluster.actor")
|
val name = config.getString("akka.remote.cluster.actor")
|
||||||
if (name.isEmpty) throw new IllegalArgumentException(
|
if (name.isEmpty) throw new IllegalArgumentException(
|
||||||
"Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined")
|
"Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined")
|
||||||
|
|
||||||
|
val serializer = Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)).newInstance.asInstanceOf[Serializer]
|
||||||
|
serializer setClassLoader loader
|
||||||
try {
|
try {
|
||||||
name map {
|
name map {
|
||||||
fqn =>
|
fqn =>
|
||||||
Class.forName(fqn).newInstance.asInstanceOf[ClusterActor]
|
val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor]
|
||||||
|
a setSerializer serializer
|
||||||
|
a
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch {
|
catch {
|
||||||
|
|
@ -251,13 +278,14 @@ object Cluster extends Cluster with Logging {
|
||||||
|
|
||||||
def foreach(f: (RemoteAddress) => Unit): Unit = clusterActor.foreach(_.foreach(f))
|
def foreach(f: (RemoteAddress) => Unit): Unit = clusterActor.foreach(_.foreach(f))
|
||||||
|
|
||||||
def start: Unit = synchronized {
|
def start: Unit = start(None)
|
||||||
|
|
||||||
|
def start(serializerClassLoader : Option[ClassLoader]): Unit = synchronized {
|
||||||
log.info("Starting up Cluster Service...")
|
log.info("Starting up Cluster Service...")
|
||||||
if (supervisor.isEmpty) {
|
if (clusterActor.isEmpty) {
|
||||||
for (actor <- createClusterActor;
|
for{ actor <- createClusterActor(serializerClassLoader getOrElse getClass.getClassLoader)
|
||||||
sup <- createSupervisor(actor)) {
|
sup <- createSupervisor(actor) } {
|
||||||
clusterActor = Some(actor)
|
clusterActor = Some(actor)
|
||||||
supervisor = Some(sup)
|
|
||||||
sup.start
|
sup.start
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -265,8 +293,10 @@ object Cluster extends Cluster with Logging {
|
||||||
|
|
||||||
def shutdown: Unit = synchronized {
|
def shutdown: Unit = synchronized {
|
||||||
log.info("Shutting down Cluster Service...")
|
log.info("Shutting down Cluster Service...")
|
||||||
supervisor.foreach(_.stop)
|
for{
|
||||||
supervisor = None
|
c <- clusterActor
|
||||||
|
s <- c._supervisor
|
||||||
|
} s.stop
|
||||||
clusterActor = None
|
clusterActor = None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -58,7 +58,7 @@ object RemoteNode extends RemoteServer
|
||||||
*/
|
*/
|
||||||
object RemoteServer {
|
object RemoteServer {
|
||||||
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
|
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
|
||||||
val PORT = config.getInt("akka.remote.server.port", 9966)
|
val PORT = config.getInt("akka.remote.server.port", 9999)
|
||||||
|
|
||||||
val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.server.connection-timeout", 1000)
|
val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.server.connection-timeout", 1000)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,10 @@ trait Serializer {
|
||||||
def deepClone(obj: AnyRef): AnyRef
|
def deepClone(obj: AnyRef): AnyRef
|
||||||
def out(obj: AnyRef): Array[Byte]
|
def out(obj: AnyRef): Array[Byte]
|
||||||
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
|
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
|
||||||
|
|
||||||
|
protected var classLoader: Option[ClassLoader] = None
|
||||||
|
|
||||||
|
def setClassLoader(cl: ClassLoader) = classLoader = Some(cl)
|
||||||
}
|
}
|
||||||
|
|
||||||
// For Java API
|
// For Java API
|
||||||
|
|
@ -52,10 +56,6 @@ object Serializer {
|
||||||
*/
|
*/
|
||||||
object Java extends Java
|
object Java extends Java
|
||||||
class Java extends Serializer {
|
class Java extends Serializer {
|
||||||
private var classLoader: Option[ClassLoader] = None
|
|
||||||
|
|
||||||
def setClassLoader(cl: ClassLoader) = classLoader = Some(cl)
|
|
||||||
|
|
||||||
def deepClone(obj: AnyRef): AnyRef = in(out(obj), None)
|
def deepClone(obj: AnyRef): AnyRef = in(out(obj), None)
|
||||||
|
|
||||||
def out(obj: AnyRef): Array[Byte] = {
|
def out(obj: AnyRef): Array[Byte] = {
|
||||||
|
|
@ -107,10 +107,6 @@ object Serializer {
|
||||||
class JavaJSON extends Serializer {
|
class JavaJSON extends Serializer {
|
||||||
private val mapper = new ObjectMapper
|
private val mapper = new ObjectMapper
|
||||||
|
|
||||||
private var classLoader: Option[ClassLoader] = None
|
|
||||||
|
|
||||||
def setClassLoader(cl: ClassLoader) = classLoader = Some(cl)
|
|
||||||
|
|
||||||
def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass))
|
def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass))
|
||||||
|
|
||||||
def out(obj: AnyRef): Array[Byte] = {
|
def out(obj: AnyRef): Array[Byte] = {
|
||||||
|
|
@ -143,10 +139,6 @@ object Serializer {
|
||||||
class ScalaJSON extends Serializer {
|
class ScalaJSON extends Serializer {
|
||||||
def deepClone(obj: AnyRef): AnyRef = in(out(obj), None)
|
def deepClone(obj: AnyRef): AnyRef = in(out(obj), None)
|
||||||
|
|
||||||
private var classLoader: Option[ClassLoader] = None
|
|
||||||
|
|
||||||
def setClassLoader(cl: ClassLoader) = classLoader = Some(cl)
|
|
||||||
|
|
||||||
def out(obj: AnyRef): Array[Byte] = SJSONSerializer.SJSON.out(obj)
|
def out(obj: AnyRef): Array[Byte] = SJSONSerializer.SJSON.out(obj)
|
||||||
|
|
||||||
// FIXME set ClassLoader on SJSONSerializer.SJSON
|
// FIXME set ClassLoader on SJSONSerializer.SJSON
|
||||||
|
|
|
||||||
|
|
@ -33,6 +33,14 @@ trait VectorStorageBackend[T] extends StorageBackend {
|
||||||
trait RefStorageBackend[T] extends StorageBackend {
|
trait RefStorageBackend[T] extends StorageBackend {
|
||||||
def insertRefStorageFor(name: String, element: T)
|
def insertRefStorageFor(name: String, element: T)
|
||||||
def getRefStorageFor(name: String): Option[T]
|
def getRefStorageFor(name: String): Option[T]
|
||||||
|
def incrementAtomically(name: String): Option[Int] =
|
||||||
|
throw new UnsupportedOperationException // only for redis
|
||||||
|
def incrementByAtomically(name: String, by: Int): Option[Int] =
|
||||||
|
throw new UnsupportedOperationException // only for redis
|
||||||
|
def decrementAtomically(name: String): Option[Int] =
|
||||||
|
throw new UnsupportedOperationException // only for redis
|
||||||
|
def decrementByAtomically(name: String, by: Int): Option[Int] =
|
||||||
|
throw new UnsupportedOperationException // only for redis
|
||||||
}
|
}
|
||||||
|
|
||||||
// for Queue
|
// for Queue
|
||||||
|
|
|
||||||
|
|
@ -252,6 +252,38 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override def incrementAtomically(name: String): Option[Int] = withErrorHandling {
|
||||||
|
db.incr(new String(encode(name.getBytes))) match {
|
||||||
|
case Some(i) => Some(i)
|
||||||
|
case None =>
|
||||||
|
throw new Predef.IllegalArgumentException(name + " exception in incr")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override def incrementByAtomically(name: String, by: Int): Option[Int] = withErrorHandling {
|
||||||
|
db.incrBy(new String(encode(name.getBytes)), by) match {
|
||||||
|
case Some(i) => Some(i)
|
||||||
|
case None =>
|
||||||
|
throw new Predef.IllegalArgumentException(name + " exception in incrby")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override def decrementAtomically(name: String): Option[Int] = withErrorHandling {
|
||||||
|
db.decr(new String(encode(name.getBytes))) match {
|
||||||
|
case Some(i) => Some(i)
|
||||||
|
case None =>
|
||||||
|
throw new Predef.IllegalArgumentException(name + " exception in decr")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override def decrementByAtomically(name: String, by: Int): Option[Int] = withErrorHandling {
|
||||||
|
db.decrBy(new String(encode(name.getBytes)), by) match {
|
||||||
|
case Some(i) => Some(i)
|
||||||
|
case None =>
|
||||||
|
throw new Predef.IllegalArgumentException(name + " exception in decrby")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// add to the end of the queue
|
// add to the end of the queue
|
||||||
def enqueue(name: String, item: Array[Byte]): Boolean = withErrorHandling {
|
def enqueue(name: String, item: Array[Byte]): Boolean = withErrorHandling {
|
||||||
db.rpush(new String(encode(name.getBytes)), new String(item))
|
db.rpush(new String(encode(name.getBytes)), new String(item))
|
||||||
|
|
|
||||||
|
|
@ -114,6 +114,48 @@ class RedisStorageBackendSpec extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
describe("atomic increment in ref") {
|
||||||
|
it("should increment an existing key value by 1") {
|
||||||
|
insertRefStorageFor("T-4-1", "1200".getBytes)
|
||||||
|
new String(getRefStorageFor("T-4-1").get) should equal("1200")
|
||||||
|
incrementAtomically("T-4-1").get should equal(1201)
|
||||||
|
}
|
||||||
|
it("should create and increment a non-existing key value by 1") {
|
||||||
|
incrementAtomically("T-4-2").get should equal(1)
|
||||||
|
new String(getRefStorageFor("T-4-2").get) should equal("1")
|
||||||
|
}
|
||||||
|
it("should increment an existing key value by the amount specified") {
|
||||||
|
insertRefStorageFor("T-4-3", "1200".getBytes)
|
||||||
|
new String(getRefStorageFor("T-4-3").get) should equal("1200")
|
||||||
|
incrementByAtomically("T-4-3", 50).get should equal(1250)
|
||||||
|
}
|
||||||
|
it("should create and increment a non-existing key value by the amount specified") {
|
||||||
|
incrementByAtomically("T-4-4", 20).get should equal(20)
|
||||||
|
new String(getRefStorageFor("T-4-4").get) should equal("20")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("atomic decrement in ref") {
|
||||||
|
it("should decrement an existing key value by 1") {
|
||||||
|
insertRefStorageFor("T-4-5", "1200".getBytes)
|
||||||
|
new String(getRefStorageFor("T-4-5").get) should equal("1200")
|
||||||
|
decrementAtomically("T-4-5").get should equal(1199)
|
||||||
|
}
|
||||||
|
it("should create and decrement a non-existing key value by 1") {
|
||||||
|
decrementAtomically("T-4-6").get should equal(-1)
|
||||||
|
new String(getRefStorageFor("T-4-6").get) should equal("-1")
|
||||||
|
}
|
||||||
|
it("should decrement an existing key value by the amount specified") {
|
||||||
|
insertRefStorageFor("T-4-7", "1200".getBytes)
|
||||||
|
new String(getRefStorageFor("T-4-7").get) should equal("1200")
|
||||||
|
decrementByAtomically("T-4-7", 50).get should equal(1150)
|
||||||
|
}
|
||||||
|
it("should create and decrement a non-existing key value by the amount specified") {
|
||||||
|
decrementByAtomically("T-4-8", 20).get should equal(-20)
|
||||||
|
new String(getRefStorageFor("T-4-8").get) should equal("-20")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
describe("store and query in queue") {
|
describe("store and query in queue") {
|
||||||
it("should give proper queue semantics") {
|
it("should give proper queue semantics") {
|
||||||
enqueue("T-5", "alan kay".getBytes)
|
enqueue("T-5", "alan kay".getBytes)
|
||||||
|
|
|
||||||
|
|
@ -49,7 +49,7 @@
|
||||||
zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
|
zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
|
||||||
|
|
||||||
<cluster>
|
<cluster>
|
||||||
service = on # FIXME add 'service = on' for <cluster>
|
service = on
|
||||||
name = "default" # The name of the cluster
|
name = "default" # The name of the cluster
|
||||||
actor = "se.scalablesolutions.akka.cluster.jgroups.JGroupsClusterActor" # FQN of an implementation of ClusterActor
|
actor = "se.scalablesolutions.akka.cluster.jgroups.JGroupsClusterActor" # FQN of an implementation of ClusterActor
|
||||||
serializer = "se.scalablesolutions.akka.serialization.Serializer$Java" # FQN of the serializer class
|
serializer = "se.scalablesolutions.akka.serialization.Serializer$Java" # FQN of the serializer class
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue