From e33cf4ddd8ad7d3bec4c7e3c4bbf0dfacc1596d2 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 7 Mar 2010 20:27:52 +0100 Subject: [PATCH 1/9] Should do the trick --- .../remote/BootableRemoteActorService.scala | 2 +- akka-core/src/main/scala/remote/Cluster.scala | 45 ++++++++++--------- .../main/scala/serialization/Serializer.scala | 16 ++----- 3 files changed, 30 insertions(+), 33 deletions(-) diff --git a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala index 6c3183ef8c..61c920c0af 100644 --- a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala +++ b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala @@ -24,8 +24,8 @@ trait BootableRemoteActorService extends Bootable with Logging { abstract override def onLoad = { if(config.getBool("akka.remote.server.service", true)){ - Cluster.start super.onLoad //Initialize BootableActorLoaderService before remote service + Cluster.start(self.applicationLoader) log.info("Initializing Remote Actors Service...") startRemoteService log.info("Remote Actors Service initialized!") diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index fb14b6b357..62aed8af1d 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -37,6 +37,10 @@ trait Cluster { */ trait ClusterActor extends Actor with Cluster { val name = config.getString("akka.remote.cluster.name") getOrElse "default" + + @volatile protected var serializer : Serializer = _ + + private[remote] def setSerializer(s : Serializer) : Unit = serializer = s } /** @@ -110,7 +114,7 @@ abstract class BasicClusterActor extends ClusterActor { case m: Message[ADDR_T] => { val (src, msg) = (m.sender, m.msg) - (Cluster.serializer in (msg, None)) match { + (serializer in (msg, None)) match { case PapersPlease => { log debug ("Asked for papers by %s", src) @@ -156,7 +160,7 @@ abstract class BasicClusterActor extends ClusterActor { * that's been set in the akka-conf */ protected def broadcast[T <: AnyRef](recipients: Iterable[ADDR_T], msg: T): Unit = { - lazy val m = Cluster.serializer out msg + lazy val m = serializer out msg for (r <- recipients) toOneNode(r, m) } @@ -165,7 +169,7 @@ abstract class BasicClusterActor extends ClusterActor { * that's been set in the akka-conf */ protected def broadcast[T <: AnyRef](msg: T): Unit = - if (!remotes.isEmpty) toAllNodes(Cluster.serializer out msg) + if (!remotes.isEmpty) toAllNodes(serializer out msg) /** * Applies the given PartialFunction to all known RemoteAddresses @@ -205,23 +209,21 @@ abstract class BasicClusterActor extends ClusterActor { object Cluster extends Cluster with Logging { lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName - @volatile private[remote] var clusterActor: Option[ClusterActor] = None + @volatile private[remote] var clusterActor: Option[ClusterActor] = None - // FIXME Use the supervisor member field - @volatile private[remote] var supervisor: Option[Supervisor] = None - - private[remote] lazy val serializer: Serializer = - Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)) - .newInstance.asInstanceOf[Serializer] - - private[remote] def createClusterActor: Option[ClusterActor] = { + private[remote] def createClusterActor(loader : ClassLoader): Option[ClusterActor] = { val name = config.getString("akka.remote.cluster.actor") if (name.isEmpty) throw new IllegalArgumentException( "Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined") + + val serializer = Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)).newInstance.asInstanceOf[Serializer] + serializer setClassLoader loader try { name map { fqn => - Class.forName(fqn).newInstance.asInstanceOf[ClusterActor] + val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor] + a setSerializer serializer + a } } catch { @@ -251,13 +253,14 @@ object Cluster extends Cluster with Logging { def foreach(f: (RemoteAddress) => Unit): Unit = clusterActor.foreach(_.foreach(f)) - def start: Unit = synchronized { + def start: Unit = start(None) + + def start(serializerClassLoader : Option[ClassLoader]): Unit = synchronized { log.info("Starting up Cluster Service...") - if (supervisor.isEmpty) { - for (actor <- createClusterActor; - sup <- createSupervisor(actor)) { + if (clusterActor.isEmpty) { + for{ actor <- createClusterActor(serializerClassLoader getOrElse getClass.getClassLoader) + sup <- createSupervisor(actor) } { clusterActor = Some(actor) - supervisor = Some(sup) sup.start } } @@ -265,8 +268,10 @@ object Cluster extends Cluster with Logging { def shutdown: Unit = synchronized { log.info("Shutting down Cluster Service...") - supervisor.foreach(_.stop) - supervisor = None + for{ + c <- clusterActor + s <- c._supervisor + } s.stop clusterActor = None } } diff --git a/akka-core/src/main/scala/serialization/Serializer.scala b/akka-core/src/main/scala/serialization/Serializer.scala index 3eb9315126..1cc930a7eb 100644 --- a/akka-core/src/main/scala/serialization/Serializer.scala +++ b/akka-core/src/main/scala/serialization/Serializer.scala @@ -21,6 +21,10 @@ trait Serializer { def deepClone(obj: AnyRef): AnyRef def out(obj: AnyRef): Array[Byte] def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef + + protected var classLoader: Option[ClassLoader] = None + + def setClassLoader(cl: ClassLoader) = classLoader = Some(cl) } // For Java API @@ -52,10 +56,6 @@ object Serializer { */ object Java extends Java class Java extends Serializer { - private var classLoader: Option[ClassLoader] = None - - def setClassLoader(cl: ClassLoader) = classLoader = Some(cl) - def deepClone(obj: AnyRef): AnyRef = in(out(obj), None) def out(obj: AnyRef): Array[Byte] = { @@ -107,10 +107,6 @@ object Serializer { class JavaJSON extends Serializer { private val mapper = new ObjectMapper - private var classLoader: Option[ClassLoader] = None - - def setClassLoader(cl: ClassLoader) = classLoader = Some(cl) - def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass)) def out(obj: AnyRef): Array[Byte] = { @@ -143,10 +139,6 @@ object Serializer { class ScalaJSON extends Serializer { def deepClone(obj: AnyRef): AnyRef = in(out(obj), None) - private var classLoader: Option[ClassLoader] = None - - def setClassLoader(cl: ClassLoader) = classLoader = Some(cl) - def out(obj: AnyRef): Array[Byte] = SJSONSerializer.SJSON.out(obj) // FIXME set ClassLoader on SJSONSerializer.SJSON From 92a3daac724e1c6c78ce94ce8b6bbe303eabddb1 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 7 Mar 2010 20:32:25 +0100 Subject: [PATCH 2/9] Revert change to RemoteServer port --- akka-core/src/main/scala/remote/RemoteServer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 02cf98bcd2..6da2ceea99 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -58,7 +58,7 @@ object RemoteNode extends RemoteServer */ object RemoteServer { val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") - val PORT = config.getInt("akka.remote.server.port", 9966) + val PORT = config.getInt("akka.remote.server.port", 9999) val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.server.connection-timeout", 1000) From b284760042f7476901eabc3ce09ee07a57b63c9b Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 7 Mar 2010 21:13:07 +0100 Subject: [PATCH 3/9] Added documentation for all methods of the Cluster trait --- akka-core/src/main/scala/remote/Cluster.scala | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index fb14b6b357..e01567c935 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -17,16 +17,41 @@ import scala.collection.immutable.{Map, HashMap} * @author Viktor Klang */ trait Cluster { + /** + * Specifies the cluster name + */ def name: String + /** + * Adds the specified hostname + port as a local node + * This information will be propagated to other nodes in the cluster + * and will be available at the other nodes through lookup and foreach + */ def registerLocalNode(hostname: String, port: Int): Unit + /** + * Removes the specified hostname + port from the local node + * This information will be propagated to other nodes in the cluster + * and will no longer be available at the other nodes through lookup and foreach + */ def deregisterLocalNode(hostname: String, port: Int): Unit + /** + * Sends the message to all Actors of the specified type on all other nodes in the cluster + */ def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit + /** + * Traverses all known remote addresses avaiable at all other nodes in the cluster + * and applies the given PartialFunction on the first address that it's defined at + * The order of application is undefined and may vary + */ def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T] + /** + * Applies the specified function to all known remote addresses on al other nodes in the cluster + * The order of application is undefined and may vary + */ def foreach(f: (RemoteAddress) => Unit): Unit } From 4840f789dd68e00199fa32ef79cb39b111c00a5c Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 7 Mar 2010 23:23:09 +0100 Subject: [PATCH 4/9] Making it possile to turn cluster on/off in config --- .../src/main/scala/remote/BootableRemoteActorService.scala | 5 ++++- config/akka-reference.conf | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala index 61c920c0af..e200db9619 100644 --- a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala +++ b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala @@ -25,7 +25,10 @@ trait BootableRemoteActorService extends Bootable with Logging { abstract override def onLoad = { if(config.getBool("akka.remote.server.service", true)){ super.onLoad //Initialize BootableActorLoaderService before remote service - Cluster.start(self.applicationLoader) + + if(config.getBool("akka.remote.cluster.service", true)) + Cluster.start(self.applicationLoader) + log.info("Initializing Remote Actors Service...") startRemoteService log.info("Remote Actors Service initialized!") diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 14933a3cac..66afda9d30 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -49,7 +49,7 @@ zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6 - service = on # FIXME add 'service = on' for + service = on name = "default" # The name of the cluster actor = "se.scalablesolutions.akka.remote.JGroupsClusterActor" # FQN of an implementation of ClusterActor serializer = "se.scalablesolutions.akka.serialization.Serializer$Java" # FQN of the serializer class From b209e1ca6a7d20234332d02568d54061bc944e8a Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 7 Mar 2010 23:45:25 +0100 Subject: [PATCH 5/9] Cleanup of onLoad --- .../src/main/scala/remote/BootableRemoteActorService.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala index e200db9619..e8c7af612f 100644 --- a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala +++ b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala @@ -23,8 +23,8 @@ trait BootableRemoteActorService extends Bootable with Logging { def startRemoteService = remoteServerThread.start abstract override def onLoad = { + super.onLoad //Initialize BootableActorLoaderService before remote service if(config.getBool("akka.remote.server.service", true)){ - super.onLoad //Initialize BootableActorLoaderService before remote service if(config.getBool("akka.remote.cluster.service", true)) Cluster.start(self.applicationLoader) @@ -33,9 +33,6 @@ trait BootableRemoteActorService extends Bootable with Logging { startRemoteService log.info("Remote Actors Service initialized!") } - else - super.onLoad - } abstract override def onUnload = { From 198dfc4398f96adcfc97c559949eea392488f2ac Mon Sep 17 00:00:00 2001 From: Eckart Hertzler Date: Mon, 8 Mar 2010 10:53:21 +0100 Subject: [PATCH 6/9] prevent Exception when shutting down cluster --- .../src/main/scala/remote/BootableRemoteActorService.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala index e8c7af612f..f68e9929d4 100644 --- a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala +++ b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala @@ -36,8 +36,6 @@ trait BootableRemoteActorService extends Bootable with Logging { } abstract override def onUnload = { - super.onUnload - log.info("Shutting down Remote Actors Service") RemoteNode.shutdown @@ -49,6 +47,8 @@ trait BootableRemoteActorService extends Bootable with Logging { Cluster.shutdown log.info("Remote Actors Service has been shut down") + + super.onUnload } -} \ No newline at end of file +} From f94dc3f12e714b99b84a3ced1facc615d60825cb Mon Sep 17 00:00:00 2001 From: Eckart Hertzler Date: Mon, 8 Mar 2010 15:31:36 +0100 Subject: [PATCH 7/9] fix classloader error when starting AKKA as a library in jetty (fixes http://www.assembla.com/spaces/akka/tickets/129 ) --- .../src/main/scala/actor/BootableActorLoaderService.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala index 60f6ec3781..b027dbcc3a 100644 --- a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala +++ b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala @@ -30,7 +30,7 @@ trait BootableActorLoaderService extends Bootable with Logging { } val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList) - new URLClassLoader(toDeploy.toArray, ClassLoader.getSystemClassLoader) + new URLClassLoader(toDeploy.toArray, getClass.getClassLoader) } else getClass.getClassLoader) } @@ -43,4 +43,4 @@ trait BootableActorLoaderService extends Bootable with Logging { } abstract override def onUnload = ActorRegistry.shutdownAll -} \ No newline at end of file +} From edf1d9a6eb30d8210750050df83c68c10d84f2ad Mon Sep 17 00:00:00 2001 From: Debasish Ghosh Date: Tue, 9 Mar 2010 11:32:58 +0530 Subject: [PATCH 8/9] added atomic increment and decrement in RedisStorageBackend --- .../src/main/scala/StorageBackend.scala | 8 ++++ .../src/main/scala/RedisStorageBackend.scala | 32 +++++++++++++ .../test/scala/RedisStorageBackendSpec.scala | 45 +++++++++++++++++++ 3 files changed, 85 insertions(+) diff --git a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala index 94233acd0a..0bc4d07897 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala @@ -33,6 +33,14 @@ trait VectorStorageBackend[T] extends StorageBackend { trait RefStorageBackend[T] extends StorageBackend { def insertRefStorageFor(name: String, element: T) def getRefStorageFor(name: String): Option[T] + def incrementAtomically(name: String): Option[Int] = + throw new UnsupportedOperationException // only for redis + def incrementByAtomically(name: String, by: Int): Option[Int] = + throw new UnsupportedOperationException // only for redis + def decrementAtomically(name: String): Option[Int] = + throw new UnsupportedOperationException // only for redis + def decrementByAtomically(name: String, by: Int): Option[Int] = + throw new UnsupportedOperationException // only for redis } // for Queue diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala index be214087f3..4d40872fb5 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala @@ -250,6 +250,38 @@ private [akka] object RedisStorageBackend extends } } + override def incrementAtomically(name: String): Option[Int] = withErrorHandling { + db.incr(new String(encode(name.getBytes))) match { + case Some(i) => Some(i) + case None => + throw new Predef.IllegalArgumentException(name + " exception in incr") + } + } + + override def incrementByAtomically(name: String, by: Int): Option[Int] = withErrorHandling { + db.incrBy(new String(encode(name.getBytes)), by) match { + case Some(i) => Some(i) + case None => + throw new Predef.IllegalArgumentException(name + " exception in incrby") + } + } + + override def decrementAtomically(name: String): Option[Int] = withErrorHandling { + db.decr(new String(encode(name.getBytes))) match { + case Some(i) => Some(i) + case None => + throw new Predef.IllegalArgumentException(name + " exception in decr") + } + } + + override def decrementByAtomically(name: String, by: Int): Option[Int] = withErrorHandling { + db.decrBy(new String(encode(name.getBytes)), by) match { + case Some(i) => Some(i) + case None => + throw new Predef.IllegalArgumentException(name + " exception in decrby") + } + } + // add to the end of the queue def enqueue(name: String, item: Array[Byte]): Boolean = withErrorHandling { db.rpush(new String(encode(name.getBytes)), new String(item)) diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala index 504a0e114d..8b80d88bea 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala @@ -114,6 +114,51 @@ class RedisStorageBackendSpec extends } } + describe("atomic increment in ref") { + it("should increment an existing key value by 1") { + insertRefStorageFor("T-4-1", "1200".getBytes) + new String(getRefStorageFor("T-4-1").get) should equal("1200") + incrementAtomically("T-4-1").get should equal(1201) + } + it("should create and increment a non-existing key value by 1") { + incrementAtomically("T-4-2").get should equal(1) + new String(getRefStorageFor("T-4-2").get) should equal("1") + } + it("should increment an existing key value by the amount specified") { + insertRefStorageFor("T-4-3", "1200".getBytes) + new String(getRefStorageFor("T-4-3").get) should equal("1200") + incrementByAtomically("T-4-3", 50).get should equal(1250) + } + it("should create and increment a non-existing key value by the amount specified") { + incrementByAtomically("T-4-4", 20).get should equal(20) + new String(getRefStorageFor("T-4-4").get) should equal("20") + } + } + + describe("atomic decrement in ref") { + it("should decrement an existing key value by 1") { + insertRefStorageFor("T-4-5", "1200".getBytes) + new String(getRefStorageFor("T-4-5").get) should equal("1200") + decrementAtomically("T-4-5").get should equal(1199) + } + it("should create and decrement a non-existing key value by 1") { + decrementAtomically("T-4-6").get should equal(-1) + new String(getRefStorageFor("T-4-6").get) should equal("-1") + } + it("should decrement an existing key value by the amount specified") { + insertRefStorageFor("T-4-7", "1200".getBytes) + new String(getRefStorageFor("T-4-7").get) should equal("1200") + decrementByAtomically("T-4-7", 50).get should equal(1150) + } + it("should create and decrement a non-existing key value by the amount specified") { + decrementByAtomically("T-4-8", 20).get should equal(-20) + new String(getRefStorageFor("T-4-8").get) should equal("-20") + } + } + + describe("atomic increment in ref") { + } + describe("store and query in queue") { it("should give proper queue semantics") { enqueue("T-5", "alan kay".getBytes) From 156ed2d461866bf0723bed8efc1fb6b23adf7050 Mon Sep 17 00:00:00 2001 From: "ross.mcdonald" Date: Wed, 10 Mar 2010 10:44:04 +0000 Subject: [PATCH 9/9] remove redundant method in tests --- .../src/test/scala/RedisStorageBackendSpec.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala index 8b80d88bea..09fc7fcfb7 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala @@ -156,9 +156,6 @@ class RedisStorageBackendSpec extends } } - describe("atomic increment in ref") { - } - describe("store and query in queue") { it("should give proper queue semantics") { enqueue("T-5", "alan kay".getBytes)