merged with upstream

This commit is contained in:
Jonas Bonér 2010-03-11 11:05:10 +01:00
commit dda8e515ca
9 changed files with 148 additions and 44 deletions

View file

@ -30,7 +30,7 @@ trait BootableActorLoaderService extends Bootable with Logging {
} }
val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL
log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList) log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList)
new URLClassLoader(toDeploy.toArray, ClassLoader.getSystemClassLoader) new URLClassLoader(toDeploy.toArray, getClass.getClassLoader)
} else getClass.getClassLoader) } else getClass.getClassLoader)
} }
@ -43,4 +43,4 @@ trait BootableActorLoaderService extends Bootable with Logging {
} }
abstract override def onUnload = ActorRegistry.shutdownAll abstract override def onUnload = ActorRegistry.shutdownAll
} }

View file

@ -23,21 +23,19 @@ trait BootableRemoteActorService extends Bootable with Logging {
def startRemoteService = remoteServerThread.start def startRemoteService = remoteServerThread.start
abstract override def onLoad = { abstract override def onLoad = {
super.onLoad //Initialize BootableActorLoaderService before remote service
if(config.getBool("akka.remote.server.service", true)){ if(config.getBool("akka.remote.server.service", true)){
Cluster.start
super.onLoad //Initialize BootableActorLoaderService before remote service if(config.getBool("akka.remote.cluster.service", true))
Cluster.start(self.applicationLoader)
log.info("Initializing Remote Actors Service...") log.info("Initializing Remote Actors Service...")
startRemoteService startRemoteService
log.info("Remote Actors Service initialized!") log.info("Remote Actors Service initialized!")
} }
else
super.onLoad
} }
abstract override def onUnload = { abstract override def onUnload = {
super.onUnload
log.info("Shutting down Remote Actors Service") log.info("Shutting down Remote Actors Service")
RemoteNode.shutdown RemoteNode.shutdown
@ -49,6 +47,8 @@ trait BootableRemoteActorService extends Bootable with Logging {
Cluster.shutdown Cluster.shutdown
log.info("Remote Actors Service has been shut down") log.info("Remote Actors Service has been shut down")
super.onUnload
} }
} }

View file

@ -17,16 +17,41 @@ import scala.collection.immutable.{Map, HashMap}
* @author Viktor Klang * @author Viktor Klang
*/ */
trait Cluster { trait Cluster {
/**
* Specifies the cluster name
*/
def name: String def name: String
/**
* Adds the specified hostname + port as a local node
* This information will be propagated to other nodes in the cluster
* and will be available at the other nodes through lookup and foreach
*/
def registerLocalNode(hostname: String, port: Int): Unit def registerLocalNode(hostname: String, port: Int): Unit
/**
* Removes the specified hostname + port from the local node
* This information will be propagated to other nodes in the cluster
* and will no longer be available at the other nodes through lookup and foreach
*/
def deregisterLocalNode(hostname: String, port: Int): Unit def deregisterLocalNode(hostname: String, port: Int): Unit
/**
* Sends the message to all Actors of the specified type on all other nodes in the cluster
*/
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit
/**
* Traverses all known remote addresses avaiable at all other nodes in the cluster
* and applies the given PartialFunction on the first address that it's defined at
* The order of application is undefined and may vary
*/
def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T] def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T]
/**
* Applies the specified function to all known remote addresses on al other nodes in the cluster
* The order of application is undefined and may vary
*/
def foreach(f: (RemoteAddress) => Unit): Unit def foreach(f: (RemoteAddress) => Unit): Unit
} }
@ -37,6 +62,10 @@ trait Cluster {
*/ */
trait ClusterActor extends Actor with Cluster { trait ClusterActor extends Actor with Cluster {
val name = config.getString("akka.remote.cluster.name") getOrElse "default" val name = config.getString("akka.remote.cluster.name") getOrElse "default"
@volatile protected var serializer : Serializer = _
private[remote] def setSerializer(s : Serializer) : Unit = serializer = s
} }
/** /**
@ -110,7 +139,7 @@ abstract class BasicClusterActor extends ClusterActor {
case m: Message[ADDR_T] => { case m: Message[ADDR_T] => {
val (src, msg) = (m.sender, m.msg) val (src, msg) = (m.sender, m.msg)
(Cluster.serializer in (msg, None)) match { (serializer in (msg, None)) match {
case PapersPlease => { case PapersPlease => {
log debug ("Asked for papers by %s", src) log debug ("Asked for papers by %s", src)
@ -156,7 +185,7 @@ abstract class BasicClusterActor extends ClusterActor {
* that's been set in the akka-conf * that's been set in the akka-conf
*/ */
protected def broadcast[T <: AnyRef](recipients: Iterable[ADDR_T], msg: T): Unit = { protected def broadcast[T <: AnyRef](recipients: Iterable[ADDR_T], msg: T): Unit = {
lazy val m = Cluster.serializer out msg lazy val m = serializer out msg
for (r <- recipients) toOneNode(r, m) for (r <- recipients) toOneNode(r, m)
} }
@ -165,7 +194,7 @@ abstract class BasicClusterActor extends ClusterActor {
* that's been set in the akka-conf * that's been set in the akka-conf
*/ */
protected def broadcast[T <: AnyRef](msg: T): Unit = protected def broadcast[T <: AnyRef](msg: T): Unit =
if (!remotes.isEmpty) toAllNodes(Cluster.serializer out msg) if (!remotes.isEmpty) toAllNodes(serializer out msg)
/** /**
* Applies the given PartialFunction to all known RemoteAddresses * Applies the given PartialFunction to all known RemoteAddresses
@ -205,23 +234,21 @@ abstract class BasicClusterActor extends ClusterActor {
object Cluster extends Cluster with Logging { object Cluster extends Cluster with Logging {
lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName
@volatile private[akka] var clusterActor: Option[ClusterActor] = None @volatile private[remote] var clusterActor: Option[ClusterActor] = None
// FIXME Use the supervisor member field private[remote] def createClusterActor(loader : ClassLoader): Option[ClusterActor] = {
@volatile private[akka] var supervisor: Option[Supervisor] = None
private[akka] lazy val serializer: Serializer =
Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME))
.newInstance.asInstanceOf[Serializer]
private[akka] def createClusterActor: Option[ClusterActor] = {
val name = config.getString("akka.remote.cluster.actor") val name = config.getString("akka.remote.cluster.actor")
if (name.isEmpty) throw new IllegalArgumentException( if (name.isEmpty) throw new IllegalArgumentException(
"Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined") "Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined")
val serializer = Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)).newInstance.asInstanceOf[Serializer]
serializer setClassLoader loader
try { try {
name map { name map {
fqn => fqn =>
Class.forName(fqn).newInstance.asInstanceOf[ClusterActor] val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor]
a setSerializer serializer
a
} }
} }
catch { catch {
@ -251,13 +278,14 @@ object Cluster extends Cluster with Logging {
def foreach(f: (RemoteAddress) => Unit): Unit = clusterActor.foreach(_.foreach(f)) def foreach(f: (RemoteAddress) => Unit): Unit = clusterActor.foreach(_.foreach(f))
def start: Unit = synchronized { def start: Unit = start(None)
def start(serializerClassLoader : Option[ClassLoader]): Unit = synchronized {
log.info("Starting up Cluster Service...") log.info("Starting up Cluster Service...")
if (supervisor.isEmpty) { if (clusterActor.isEmpty) {
for (actor <- createClusterActor; for{ actor <- createClusterActor(serializerClassLoader getOrElse getClass.getClassLoader)
sup <- createSupervisor(actor)) { sup <- createSupervisor(actor) } {
clusterActor = Some(actor) clusterActor = Some(actor)
supervisor = Some(sup)
sup.start sup.start
} }
} }
@ -265,8 +293,10 @@ object Cluster extends Cluster with Logging {
def shutdown: Unit = synchronized { def shutdown: Unit = synchronized {
log.info("Shutting down Cluster Service...") log.info("Shutting down Cluster Service...")
supervisor.foreach(_.stop) for{
supervisor = None c <- clusterActor
s <- c._supervisor
} s.stop
clusterActor = None clusterActor = None
} }
} }

View file

@ -58,7 +58,7 @@ object RemoteNode extends RemoteServer
*/ */
object RemoteServer { object RemoteServer {
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
val PORT = config.getInt("akka.remote.server.port", 9966) val PORT = config.getInt("akka.remote.server.port", 9999)
val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.server.connection-timeout", 1000) val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.server.connection-timeout", 1000)

View file

@ -21,6 +21,10 @@ trait Serializer {
def deepClone(obj: AnyRef): AnyRef def deepClone(obj: AnyRef): AnyRef
def out(obj: AnyRef): Array[Byte] def out(obj: AnyRef): Array[Byte]
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
protected var classLoader: Option[ClassLoader] = None
def setClassLoader(cl: ClassLoader) = classLoader = Some(cl)
} }
// For Java API // For Java API
@ -52,10 +56,6 @@ object Serializer {
*/ */
object Java extends Java object Java extends Java
class Java extends Serializer { class Java extends Serializer {
private var classLoader: Option[ClassLoader] = None
def setClassLoader(cl: ClassLoader) = classLoader = Some(cl)
def deepClone(obj: AnyRef): AnyRef = in(out(obj), None) def deepClone(obj: AnyRef): AnyRef = in(out(obj), None)
def out(obj: AnyRef): Array[Byte] = { def out(obj: AnyRef): Array[Byte] = {
@ -107,10 +107,6 @@ object Serializer {
class JavaJSON extends Serializer { class JavaJSON extends Serializer {
private val mapper = new ObjectMapper private val mapper = new ObjectMapper
private var classLoader: Option[ClassLoader] = None
def setClassLoader(cl: ClassLoader) = classLoader = Some(cl)
def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass)) def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass))
def out(obj: AnyRef): Array[Byte] = { def out(obj: AnyRef): Array[Byte] = {
@ -143,10 +139,6 @@ object Serializer {
class ScalaJSON extends Serializer { class ScalaJSON extends Serializer {
def deepClone(obj: AnyRef): AnyRef = in(out(obj), None) def deepClone(obj: AnyRef): AnyRef = in(out(obj), None)
private var classLoader: Option[ClassLoader] = None
def setClassLoader(cl: ClassLoader) = classLoader = Some(cl)
def out(obj: AnyRef): Array[Byte] = SJSONSerializer.SJSON.out(obj) def out(obj: AnyRef): Array[Byte] = SJSONSerializer.SJSON.out(obj)
// FIXME set ClassLoader on SJSONSerializer.SJSON // FIXME set ClassLoader on SJSONSerializer.SJSON

View file

@ -33,6 +33,14 @@ trait VectorStorageBackend[T] extends StorageBackend {
trait RefStorageBackend[T] extends StorageBackend { trait RefStorageBackend[T] extends StorageBackend {
def insertRefStorageFor(name: String, element: T) def insertRefStorageFor(name: String, element: T)
def getRefStorageFor(name: String): Option[T] def getRefStorageFor(name: String): Option[T]
def incrementAtomically(name: String): Option[Int] =
throw new UnsupportedOperationException // only for redis
def incrementByAtomically(name: String, by: Int): Option[Int] =
throw new UnsupportedOperationException // only for redis
def decrementAtomically(name: String): Option[Int] =
throw new UnsupportedOperationException // only for redis
def decrementByAtomically(name: String, by: Int): Option[Int] =
throw new UnsupportedOperationException // only for redis
} }
// for Queue // for Queue

View file

@ -252,6 +252,38 @@ private [akka] object RedisStorageBackend extends
} }
} }
override def incrementAtomically(name: String): Option[Int] = withErrorHandling {
db.incr(new String(encode(name.getBytes))) match {
case Some(i) => Some(i)
case None =>
throw new Predef.IllegalArgumentException(name + " exception in incr")
}
}
override def incrementByAtomically(name: String, by: Int): Option[Int] = withErrorHandling {
db.incrBy(new String(encode(name.getBytes)), by) match {
case Some(i) => Some(i)
case None =>
throw new Predef.IllegalArgumentException(name + " exception in incrby")
}
}
override def decrementAtomically(name: String): Option[Int] = withErrorHandling {
db.decr(new String(encode(name.getBytes))) match {
case Some(i) => Some(i)
case None =>
throw new Predef.IllegalArgumentException(name + " exception in decr")
}
}
override def decrementByAtomically(name: String, by: Int): Option[Int] = withErrorHandling {
db.decrBy(new String(encode(name.getBytes)), by) match {
case Some(i) => Some(i)
case None =>
throw new Predef.IllegalArgumentException(name + " exception in decrby")
}
}
// add to the end of the queue // add to the end of the queue
def enqueue(name: String, item: Array[Byte]): Boolean = withErrorHandling { def enqueue(name: String, item: Array[Byte]): Boolean = withErrorHandling {
db.rpush(new String(encode(name.getBytes)), new String(item)) db.rpush(new String(encode(name.getBytes)), new String(item))

View file

@ -114,6 +114,48 @@ class RedisStorageBackendSpec extends
} }
} }
describe("atomic increment in ref") {
it("should increment an existing key value by 1") {
insertRefStorageFor("T-4-1", "1200".getBytes)
new String(getRefStorageFor("T-4-1").get) should equal("1200")
incrementAtomically("T-4-1").get should equal(1201)
}
it("should create and increment a non-existing key value by 1") {
incrementAtomically("T-4-2").get should equal(1)
new String(getRefStorageFor("T-4-2").get) should equal("1")
}
it("should increment an existing key value by the amount specified") {
insertRefStorageFor("T-4-3", "1200".getBytes)
new String(getRefStorageFor("T-4-3").get) should equal("1200")
incrementByAtomically("T-4-3", 50).get should equal(1250)
}
it("should create and increment a non-existing key value by the amount specified") {
incrementByAtomically("T-4-4", 20).get should equal(20)
new String(getRefStorageFor("T-4-4").get) should equal("20")
}
}
describe("atomic decrement in ref") {
it("should decrement an existing key value by 1") {
insertRefStorageFor("T-4-5", "1200".getBytes)
new String(getRefStorageFor("T-4-5").get) should equal("1200")
decrementAtomically("T-4-5").get should equal(1199)
}
it("should create and decrement a non-existing key value by 1") {
decrementAtomically("T-4-6").get should equal(-1)
new String(getRefStorageFor("T-4-6").get) should equal("-1")
}
it("should decrement an existing key value by the amount specified") {
insertRefStorageFor("T-4-7", "1200".getBytes)
new String(getRefStorageFor("T-4-7").get) should equal("1200")
decrementByAtomically("T-4-7", 50).get should equal(1150)
}
it("should create and decrement a non-existing key value by the amount specified") {
decrementByAtomically("T-4-8", 20).get should equal(-20)
new String(getRefStorageFor("T-4-8").get) should equal("-20")
}
}
describe("store and query in queue") { describe("store and query in queue") {
it("should give proper queue semantics") { it("should give proper queue semantics") {
enqueue("T-5", "alan kay".getBytes) enqueue("T-5", "alan kay".getBytes)

View file

@ -49,7 +49,7 @@
zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6 zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
<cluster> <cluster>
service = on # FIXME add 'service = on' for <cluster> service = on
name = "default" # The name of the cluster name = "default" # The name of the cluster
actor = "se.scalablesolutions.akka.cluster.jgroups.JGroupsClusterActor" # FQN of an implementation of ClusterActor actor = "se.scalablesolutions.akka.cluster.jgroups.JGroupsClusterActor" # FQN of an implementation of ClusterActor
serializer = "se.scalablesolutions.akka.serialization.Serializer$Java" # FQN of the serializer class serializer = "se.scalablesolutions.akka.serialization.Serializer$Java" # FQN of the serializer class