diff --git a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala index d6e87ef027..5c80620d80 100644 --- a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala +++ b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala @@ -30,7 +30,7 @@ trait BootableActorLoaderService extends Bootable with Logging { } val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList) - new URLClassLoader(toDeploy.toArray, ClassLoader.getSystemClassLoader) + new URLClassLoader(toDeploy.toArray, getClass.getClassLoader) } else getClass.getClassLoader) } @@ -43,4 +43,4 @@ trait BootableActorLoaderService extends Bootable with Logging { } abstract override def onUnload = ActorRegistry.shutdownAll -} \ No newline at end of file +} diff --git a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala index 2b8c2aa132..8aaec0661b 100644 --- a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala +++ b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala @@ -23,21 +23,19 @@ trait BootableRemoteActorService extends Bootable with Logging { def startRemoteService = remoteServerThread.start abstract override def onLoad = { + super.onLoad //Initialize BootableActorLoaderService before remote service if(config.getBool("akka.remote.server.service", true)){ - Cluster.start - super.onLoad //Initialize BootableActorLoaderService before remote service + + if(config.getBool("akka.remote.cluster.service", true)) + Cluster.start(self.applicationLoader) + log.info("Initializing Remote Actors Service...") startRemoteService log.info("Remote Actors Service initialized!") } - else - super.onLoad - } abstract override def onUnload = { - super.onUnload - log.info("Shutting down Remote Actors Service") RemoteNode.shutdown @@ -49,6 +47,8 @@ trait BootableRemoteActorService extends Bootable with Logging { Cluster.shutdown log.info("Remote Actors Service has been shut down") + + super.onUnload } -} \ No newline at end of file +} diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index 145b07462f..7156c999bc 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -17,16 +17,41 @@ import scala.collection.immutable.{Map, HashMap} * @author Viktor Klang */ trait Cluster { + /** + * Specifies the cluster name + */ def name: String + /** + * Adds the specified hostname + port as a local node + * This information will be propagated to other nodes in the cluster + * and will be available at the other nodes through lookup and foreach + */ def registerLocalNode(hostname: String, port: Int): Unit + /** + * Removes the specified hostname + port from the local node + * This information will be propagated to other nodes in the cluster + * and will no longer be available at the other nodes through lookup and foreach + */ def deregisterLocalNode(hostname: String, port: Int): Unit + /** + * Sends the message to all Actors of the specified type on all other nodes in the cluster + */ def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit + /** + * Traverses all known remote addresses avaiable at all other nodes in the cluster + * and applies the given PartialFunction on the first address that it's defined at + * The order of application is undefined and may vary + */ def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T] + /** + * Applies the specified function to all known remote addresses on al other nodes in the cluster + * The order of application is undefined and may vary + */ def foreach(f: (RemoteAddress) => Unit): Unit } @@ -37,6 +62,10 @@ trait Cluster { */ trait ClusterActor extends Actor with Cluster { val name = config.getString("akka.remote.cluster.name") getOrElse "default" + + @volatile protected var serializer : Serializer = _ + + private[remote] def setSerializer(s : Serializer) : Unit = serializer = s } /** @@ -110,7 +139,7 @@ abstract class BasicClusterActor extends ClusterActor { case m: Message[ADDR_T] => { val (src, msg) = (m.sender, m.msg) - (Cluster.serializer in (msg, None)) match { + (serializer in (msg, None)) match { case PapersPlease => { log debug ("Asked for papers by %s", src) @@ -156,7 +185,7 @@ abstract class BasicClusterActor extends ClusterActor { * that's been set in the akka-conf */ protected def broadcast[T <: AnyRef](recipients: Iterable[ADDR_T], msg: T): Unit = { - lazy val m = Cluster.serializer out msg + lazy val m = serializer out msg for (r <- recipients) toOneNode(r, m) } @@ -165,7 +194,7 @@ abstract class BasicClusterActor extends ClusterActor { * that's been set in the akka-conf */ protected def broadcast[T <: AnyRef](msg: T): Unit = - if (!remotes.isEmpty) toAllNodes(Cluster.serializer out msg) + if (!remotes.isEmpty) toAllNodes(serializer out msg) /** * Applies the given PartialFunction to all known RemoteAddresses @@ -205,23 +234,21 @@ abstract class BasicClusterActor extends ClusterActor { object Cluster extends Cluster with Logging { lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName - @volatile private[akka] var clusterActor: Option[ClusterActor] = None + @volatile private[remote] var clusterActor: Option[ClusterActor] = None - // FIXME Use the supervisor member field - @volatile private[akka] var supervisor: Option[Supervisor] = None - - private[akka] lazy val serializer: Serializer = - Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)) - .newInstance.asInstanceOf[Serializer] - - private[akka] def createClusterActor: Option[ClusterActor] = { + private[remote] def createClusterActor(loader : ClassLoader): Option[ClusterActor] = { val name = config.getString("akka.remote.cluster.actor") if (name.isEmpty) throw new IllegalArgumentException( "Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined") + + val serializer = Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)).newInstance.asInstanceOf[Serializer] + serializer setClassLoader loader try { name map { fqn => - Class.forName(fqn).newInstance.asInstanceOf[ClusterActor] + val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor] + a setSerializer serializer + a } } catch { @@ -251,13 +278,14 @@ object Cluster extends Cluster with Logging { def foreach(f: (RemoteAddress) => Unit): Unit = clusterActor.foreach(_.foreach(f)) - def start: Unit = synchronized { + def start: Unit = start(None) + + def start(serializerClassLoader : Option[ClassLoader]): Unit = synchronized { log.info("Starting up Cluster Service...") - if (supervisor.isEmpty) { - for (actor <- createClusterActor; - sup <- createSupervisor(actor)) { + if (clusterActor.isEmpty) { + for{ actor <- createClusterActor(serializerClassLoader getOrElse getClass.getClassLoader) + sup <- createSupervisor(actor) } { clusterActor = Some(actor) - supervisor = Some(sup) sup.start } } @@ -265,8 +293,10 @@ object Cluster extends Cluster with Logging { def shutdown: Unit = synchronized { log.info("Shutting down Cluster Service...") - supervisor.foreach(_.stop) - supervisor = None + for{ + c <- clusterActor + s <- c._supervisor + } s.stop clusterActor = None } } diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index bcd6b55494..8a40049fea 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -58,7 +58,7 @@ object RemoteNode extends RemoteServer */ object RemoteServer { val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") - val PORT = config.getInt("akka.remote.server.port", 9966) + val PORT = config.getInt("akka.remote.server.port", 9999) val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.server.connection-timeout", 1000) diff --git a/akka-core/src/main/scala/serialization/Serializer.scala b/akka-core/src/main/scala/serialization/Serializer.scala index 3eb9315126..1cc930a7eb 100644 --- a/akka-core/src/main/scala/serialization/Serializer.scala +++ b/akka-core/src/main/scala/serialization/Serializer.scala @@ -21,6 +21,10 @@ trait Serializer { def deepClone(obj: AnyRef): AnyRef def out(obj: AnyRef): Array[Byte] def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef + + protected var classLoader: Option[ClassLoader] = None + + def setClassLoader(cl: ClassLoader) = classLoader = Some(cl) } // For Java API @@ -52,10 +56,6 @@ object Serializer { */ object Java extends Java class Java extends Serializer { - private var classLoader: Option[ClassLoader] = None - - def setClassLoader(cl: ClassLoader) = classLoader = Some(cl) - def deepClone(obj: AnyRef): AnyRef = in(out(obj), None) def out(obj: AnyRef): Array[Byte] = { @@ -107,10 +107,6 @@ object Serializer { class JavaJSON extends Serializer { private val mapper = new ObjectMapper - private var classLoader: Option[ClassLoader] = None - - def setClassLoader(cl: ClassLoader) = classLoader = Some(cl) - def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass)) def out(obj: AnyRef): Array[Byte] = { @@ -143,10 +139,6 @@ object Serializer { class ScalaJSON extends Serializer { def deepClone(obj: AnyRef): AnyRef = in(out(obj), None) - private var classLoader: Option[ClassLoader] = None - - def setClassLoader(cl: ClassLoader) = classLoader = Some(cl) - def out(obj: AnyRef): Array[Byte] = SJSONSerializer.SJSON.out(obj) // FIXME set ClassLoader on SJSONSerializer.SJSON diff --git a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala index cf08095334..d909f0e4a4 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala @@ -33,6 +33,14 @@ trait VectorStorageBackend[T] extends StorageBackend { trait RefStorageBackend[T] extends StorageBackend { def insertRefStorageFor(name: String, element: T) def getRefStorageFor(name: String): Option[T] + def incrementAtomically(name: String): Option[Int] = + throw new UnsupportedOperationException // only for redis + def incrementByAtomically(name: String, by: Int): Option[Int] = + throw new UnsupportedOperationException // only for redis + def decrementAtomically(name: String): Option[Int] = + throw new UnsupportedOperationException // only for redis + def decrementByAtomically(name: String, by: Int): Option[Int] = + throw new UnsupportedOperationException // only for redis } // for Queue diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala index 59a04135b4..8bca9c6af6 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala @@ -252,6 +252,38 @@ private [akka] object RedisStorageBackend extends } } + override def incrementAtomically(name: String): Option[Int] = withErrorHandling { + db.incr(new String(encode(name.getBytes))) match { + case Some(i) => Some(i) + case None => + throw new Predef.IllegalArgumentException(name + " exception in incr") + } + } + + override def incrementByAtomically(name: String, by: Int): Option[Int] = withErrorHandling { + db.incrBy(new String(encode(name.getBytes)), by) match { + case Some(i) => Some(i) + case None => + throw new Predef.IllegalArgumentException(name + " exception in incrby") + } + } + + override def decrementAtomically(name: String): Option[Int] = withErrorHandling { + db.decr(new String(encode(name.getBytes))) match { + case Some(i) => Some(i) + case None => + throw new Predef.IllegalArgumentException(name + " exception in decr") + } + } + + override def decrementByAtomically(name: String, by: Int): Option[Int] = withErrorHandling { + db.decrBy(new String(encode(name.getBytes)), by) match { + case Some(i) => Some(i) + case None => + throw new Predef.IllegalArgumentException(name + " exception in decrby") + } + } + // add to the end of the queue def enqueue(name: String, item: Array[Byte]): Boolean = withErrorHandling { db.rpush(new String(encode(name.getBytes)), new String(item)) diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala index 737a7e2894..cfe704c6ba 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala @@ -114,6 +114,48 @@ class RedisStorageBackendSpec extends } } + describe("atomic increment in ref") { + it("should increment an existing key value by 1") { + insertRefStorageFor("T-4-1", "1200".getBytes) + new String(getRefStorageFor("T-4-1").get) should equal("1200") + incrementAtomically("T-4-1").get should equal(1201) + } + it("should create and increment a non-existing key value by 1") { + incrementAtomically("T-4-2").get should equal(1) + new String(getRefStorageFor("T-4-2").get) should equal("1") + } + it("should increment an existing key value by the amount specified") { + insertRefStorageFor("T-4-3", "1200".getBytes) + new String(getRefStorageFor("T-4-3").get) should equal("1200") + incrementByAtomically("T-4-3", 50).get should equal(1250) + } + it("should create and increment a non-existing key value by the amount specified") { + incrementByAtomically("T-4-4", 20).get should equal(20) + new String(getRefStorageFor("T-4-4").get) should equal("20") + } + } + + describe("atomic decrement in ref") { + it("should decrement an existing key value by 1") { + insertRefStorageFor("T-4-5", "1200".getBytes) + new String(getRefStorageFor("T-4-5").get) should equal("1200") + decrementAtomically("T-4-5").get should equal(1199) + } + it("should create and decrement a non-existing key value by 1") { + decrementAtomically("T-4-6").get should equal(-1) + new String(getRefStorageFor("T-4-6").get) should equal("-1") + } + it("should decrement an existing key value by the amount specified") { + insertRefStorageFor("T-4-7", "1200".getBytes) + new String(getRefStorageFor("T-4-7").get) should equal("1200") + decrementByAtomically("T-4-7", 50).get should equal(1150) + } + it("should create and decrement a non-existing key value by the amount specified") { + decrementByAtomically("T-4-8", 20).get should equal(-20) + new String(getRefStorageFor("T-4-8").get) should equal("-20") + } + } + describe("store and query in queue") { it("should give proper queue semantics") { enqueue("T-5", "alan kay".getBytes) diff --git a/config/akka-reference.conf b/config/akka-reference.conf index dd3cfc01b2..ca9f1068ea 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -49,7 +49,7 @@ zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6 - service = on # FIXME add 'service = on' for + service = on name = "default" # The name of the cluster actor = "se.scalablesolutions.akka.cluster.jgroups.JGroupsClusterActor" # FQN of an implementation of ClusterActor serializer = "se.scalablesolutions.akka.serialization.Serializer$Java" # FQN of the serializer class