diff --git a/akka.ipr b/akka.ipr
index 226c8db6c4..ab65121d85 100644
--- a/akka.ipr
+++ b/akka.ipr
@@ -1869,17 +1869,6 @@
-
-
-
-
-
-
-
-
-
-
-
@@ -1924,6 +1913,17 @@
+
+
+
+
+
+
+
+
+
+
+
diff --git a/akka.iws b/akka.iws
index 9b399760a0..ade494ece3 100644
--- a/akka.iws
+++ b/akka.iws
@@ -6,12 +6,16 @@
-
-
-
-
+
+
+
+
+
+
+
+
@@ -96,17 +100,113 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -114,7 +214,7 @@
-
+
@@ -124,7 +224,18 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
@@ -142,11 +253,8 @@
@@ -296,58 +407,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -445,6 +504,14 @@
+
+
+
+
+
+
+
+
@@ -694,88 +761,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -807,7 +792,7 @@
-
+
@@ -835,7 +820,7 @@
-
+
@@ -1030,6 +1015,17 @@
+
+
+
+
+
+
+
+
+
+
+
@@ -1233,34 +1229,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -1843,17 +1811,23 @@
-
-
-
-
-
+
+
-
+
+
+
+
+
+
+
-
+
+
+
+
@@ -1895,7 +1869,7 @@
-
+
@@ -1903,28 +1877,27 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
localhost
@@ -2005,7 +1978,7 @@
-
+
@@ -2013,7 +1986,8 @@
-
+
+
@@ -2026,7 +2000,6 @@
-
@@ -2068,51 +2041,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -2123,7 +2051,18 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
@@ -2165,20 +2104,104 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
-
+
diff --git a/kernel/akka-kernel.iml b/kernel/akka-kernel.iml
index 91e7a458e0..fb2fcb4f5b 100644
--- a/kernel/akka-kernel.iml
+++ b/kernel/akka-kernel.iml
@@ -5,12 +5,12 @@
-
+
-
+
@@ -28,26 +28,27 @@
-
-
+
+
-
-
-
-
+
+
+
+
+
@@ -55,13 +56,10 @@
-
+
+
-
-
-
-
@@ -79,10 +77,6 @@
-
-
-
-
@@ -93,7 +87,6 @@
-
diff --git a/kernel/src/main/scala/Kernel.scala b/kernel/src/main/scala/Kernel.scala
old mode 100755
new mode 100644
index 7ecfd61141..ada31e9f91
--- a/kernel/src/main/scala/Kernel.scala
+++ b/kernel/src/main/scala/Kernel.scala
@@ -32,7 +32,7 @@ object Kernel extends Logging {
}
val config = setupConfig
-
+
val CONFIG_VERSION = config.getString("akka.version", "0")
if (VERSION != CONFIG_VERSION) throw new IllegalStateException("Akka JAR version [" + VERSION + "] is different than the provided config ('akka.conf') version [" + CONFIG_VERSION + "]")
@@ -51,7 +51,7 @@ object Kernel extends Logging {
private var jerseySelectorThread: SelectorThread = _
private val startTime = System.currentTimeMillis
private var applicationLoader: Option[ClassLoader] = None
-
+
def main(args: Array[String]) = boot
def boot = synchronized {
@@ -65,13 +65,13 @@ object Kernel extends Logging {
if (RUN_MANAGEMENT_SERVICE) startManagementService
STORAGE_SYSTEM match {
- case "cassandra" => startCassandra
- case "terracotta" => throw new UnsupportedOperationException("terracotta storage backend is not yet supported")
- case "mongodb" => throw new UnsupportedOperationException("mongodb storage backend is not yet supported")
- case "redis" => throw new UnsupportedOperationException("redis storage backend is not yet supported")
- case "voldemort" => throw new UnsupportedOperationException("voldemort storage backend is not yet supported")
+ case "cassandra" => startCassandra
+ case "terracotta" => throw new UnsupportedOperationException("terracotta storage backend is not yet supported")
+ case "mongodb" => throw new UnsupportedOperationException("mongodb storage backend is not yet supported")
+ case "redis" => throw new UnsupportedOperationException("redis storage backend is not yet supported")
+ case "voldemort" => throw new UnsupportedOperationException("voldemort storage backend is not yet supported")
case "tokyo-cabinet" => throw new UnsupportedOperationException("tokyo-cabinet storage backend is not yet supported")
- case _ => throw new UnsupportedOperationException("Unknown storage system [" + STORAGE_SYSTEM + "]")
+ case _ => throw new UnsupportedOperationException("Unknown storage system [" + STORAGE_SYSTEM + "]")
}
if (RUN_REST_SERVICE) startREST
@@ -81,26 +81,26 @@ object Kernel extends Logging {
hasBooted = true
}
}
-
+
def uptime = (System.currentTimeMillis - startTime) / 1000
def setupConfig: Config = {
- if (HOME.isDefined) {
- try {
- val configFile = HOME.get + "/config/akka.conf"
- Configgy.configure(configFile)
- log.info("AKKA_HOME is defined to [%s], config loaded from [%s].", HOME.get, configFile)
- } catch {
- case e: ParseException => throw new IllegalStateException("'akka.conf' config file can not be found in [" + HOME + "/config/akka.conf] - aborting. Either add it in the 'config' directory or add it to the classpath.")
- }
- } else {
- try {
- Configgy.configureFromResource("akka.conf", getClass.getClassLoader)
- log.info("Config loaded from the application classpath.")
- } catch {
- case e: ParseException => throw new IllegalStateException("'$AKKA_HOME/config/akka.conf' could not be found and no 'akka.conf' can be found on the classpath - aborting. . Either add it in the '$AKKA_HOME/config' directory or add it to the classpath.")
- }
+ if (HOME.isDefined) {
+ try {
+ val configFile = HOME.get + "/config/akka.conf"
+ Configgy.configure(configFile)
+ log.info("AKKA_HOME is defined to [%s], config loaded from [%s].", HOME.get, configFile)
+ } catch {
+ case e: ParseException => throw new IllegalStateException("'akka.conf' config file can not be found in [" + HOME + "/config/akka.conf] - aborting. Either add it in the 'config' directory or add it to the classpath.")
}
+ } else {
+ try {
+ Configgy.configureFromResource("akka.conf", getClass.getClassLoader)
+ log.info("Config loaded from the application classpath.")
+ } catch {
+ case e: ParseException => throw new IllegalStateException("'$AKKA_HOME/config/akka.conf' could not be found and no 'akka.conf' can be found on the classpath - aborting. . Either add it in the '$AKKA_HOME/config' directory or add it to the classpath.")
+ }
+ }
val config = Configgy.config
config.registerWithJmx("com.scalablesolutions.akka")
// FIXME fix Configgy JMX subscription to allow management
@@ -111,29 +111,29 @@ object Kernel extends Logging {
private[akka] def runApplicationBootClasses = {
new management.RestfulJMXBoot // add the REST/JMX service
val loader =
- if (HOME.isDefined) {
- val CONFIG = HOME.get + "/config"
- val DEPLOY = HOME.get + "/deploy"
- val DEPLOY_DIR = new File(DEPLOY)
- if (!DEPLOY_DIR.exists) { log.error("Could not find a deploy directory at [" + DEPLOY + "]"); System.exit(-1) }
- val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL
- //val toDeploy = DEPLOY_DIR.toURL :: (for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL)
- log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList)
- new URLClassLoader(toDeploy.toArray, getClass.getClassLoader)
- } else if (getClass.getClassLoader.getResourceAsStream("akka.conf") != null) {
- getClass.getClassLoader
- } else throw new IllegalStateException("AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
+ if (HOME.isDefined) {
+ val CONFIG = HOME.get + "/config"
+ val DEPLOY = HOME.get + "/deploy"
+ val DEPLOY_DIR = new File(DEPLOY)
+ if (!DEPLOY_DIR.exists) {log.error("Could not find a deploy directory at [" + DEPLOY + "]"); System.exit(-1)}
+ val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL
+ //val toDeploy = DEPLOY_DIR.toURL :: (for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL)
+ log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList)
+ new URLClassLoader(toDeploy.toArray, getClass.getClassLoader)
+ } else if (getClass.getClassLoader.getResourceAsStream("akka.conf") != null) {
+ getClass.getClassLoader
+ } else throw new IllegalStateException("AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
for (clazz <- BOOT_CLASSES) {
log.info("Loading boot class [%s]", clazz)
loader.loadClass(clazz).newInstance
}
applicationLoader = Some(loader)
}
-
+
private[akka] def startRemoteService = {
// FIXME manage remote serve thread for graceful shutdown
val remoteServerThread = new Thread(new Runnable() {
- def run = RemoteServer.start(applicationLoader)
+ def run = RemoteServer.start(applicationLoader)
}, "Akka Remote Service")
remoteServerThread.start
}
@@ -178,18 +178,18 @@ object Kernel extends Logging {
private def printBanner = {
log.info(
-"""==============================
- __ __
- _____ | | _| | _______
- \__ \ | |/ / |/ /\__ \
- / __ \| <| < / __ \_
- (____ /__|_ \__|_ \(____ /
- \/ \/ \/ \/
-""")
+ """==============================
+ __ __
+ _____ | | _| | _______
+ \__ \ | |/ / |/ /\__ \
+ / __ \| <| < / __ \_
+ (____ /__|_ \__|_ \(____ /
+ \/ \/ \/ \/
+ """)
log.info(" Running version " + VERSION)
log.info("==============================")
}
-
+
private def cassandraBenchmark = {
val NR_ENTRIES = 100000
@@ -206,7 +206,7 @@ object Kernel extends Logging {
CassandraStorage.insertMapStorageEntriesFor("test", entries.toList)
end = System.currentTimeMillis
println("Writes per second - batch: " + NR_ENTRIES / ((end - start).toDouble / 1000))
-
+
println("=================================================")
start = System.currentTimeMillis
for (i <- 1 to NR_ENTRIES) CassandraStorage.getMapStorageEntryFor("test", i.toString)
@@ -219,15 +219,15 @@ object Kernel extends Logging {
-
-/*
- //import voldemort.client.{SocketStoreClientFactory, StoreClient, StoreClientFactory}
- //import voldemort.server.{VoldemortConfig, VoldemortServer}
- //import voldemort.versioning.Versioned
- private[this] var storageFactory: StoreClientFactory = _
- private[this] var storageServer: VoldemortServer = _
- */
+/*
+//import voldemort.client.{SocketStoreClientFactory, StoreClient, StoreClientFactory}
+//import voldemort.server.{VoldemortConfig, VoldemortServer}
+//import voldemort.versioning.Versioned
+
+ private[this] var storageFactory: StoreClientFactory = _
+ private[this] var storageServer: VoldemortServer = _
+*/
// private[akka] def startVoldemort = {
// val VOLDEMORT_SERVER_URL = "tcp://" + SERVER_URL
@@ -263,39 +263,39 @@ object Kernel extends Logging {
// private[akka] def getStorageFor(storageName: String): StoreClient[String, String] =
// storageFactory.getStoreClient(storageName)
- // private[akka] def startZooKeeper = {
- //import org.apache.zookeeper.jmx.ManagedUtil
- //import org.apache.zookeeper.server.persistence.FileTxnSnapLog
- //import org.apache.zookeeper.server.ServerConfig
- //import org.apache.zookeeper.server.NIOServerCnxn
- // val ZOO_KEEPER_SERVER_URL = SERVER_URL
- // val ZOO_KEEPER_SERVER_PORT = 9898
- // try {
- // ManagedUtil.registerLog4jMBeans
- // ServerConfig.parse(args)
- // } catch {
- // case e: JMException => log.warning("Unable to register log4j JMX control: s%", e)
- // case e => log.fatal("Error in ZooKeeper config: s%", e)
- // }
- // val factory = new ZooKeeperServer.Factory() {
- // override def createConnectionFactory = new NIOServerCnxn.Factory(ServerConfig.getClientPort)
- // override def createServer = {
- // val server = new ZooKeeperServer
- // val txLog = new FileTxnSnapLog(
- // new File(ServerConfig.getDataLogDir),
- // new File(ServerConfig.getDataDir))
- // server.setTxnLogFactory(txLog)
- // server
- // }
- // }
- // try {
- // val zooKeeper = factory.createServer
- // zooKeeper.startup
- // log.info("ZooKeeper started")
- // // TODO: handle clean shutdown as below in separate thread
- // // val cnxnFactory = serverFactory.createConnectionFactory
- // // cnxnFactory.setZooKeeperServer(zooKeeper)
- // // cnxnFactory.join
- // // if (zooKeeper.isRunning) zooKeeper.shutdown
- // } catch { case e => log.fatal("Unexpected exception: s%",e) }
- // }
+// private[akka] def startZooKeeper = {
+//import org.apache.zookeeper.jmx.ManagedUtil
+//import org.apache.zookeeper.server.persistence.FileTxnSnapLog
+//import org.apache.zookeeper.server.ServerConfig
+//import org.apache.zookeeper.server.NIOServerCnxn
+// val ZOO_KEEPER_SERVER_URL = SERVER_URL
+// val ZOO_KEEPER_SERVER_PORT = 9898
+// try {
+// ManagedUtil.registerLog4jMBeans
+// ServerConfig.parse(args)
+// } catch {
+// case e: JMException => log.warning("Unable to register log4j JMX control: s%", e)
+// case e => log.fatal("Error in ZooKeeper config: s%", e)
+// }
+// val factory = new ZooKeeperServer.Factory() {
+// override def createConnectionFactory = new NIOServerCnxn.Factory(ServerConfig.getClientPort)
+// override def createServer = {
+// val server = new ZooKeeperServer
+// val txLog = new FileTxnSnapLog(
+// new File(ServerConfig.getDataLogDir),
+// new File(ServerConfig.getDataDir))
+// server.setTxnLogFactory(txLog)
+// server
+// }
+// }
+// try {
+// val zooKeeper = factory.createServer
+// zooKeeper.startup
+// log.info("ZooKeeper started")
+// // TODO: handle clean shutdown as below in separate thread
+// // val cnxnFactory = serverFactory.createConnectionFactory
+// // cnxnFactory.setZooKeeperServer(zooKeeper)
+// // cnxnFactory.join
+// // if (zooKeeper.isRunning) zooKeeper.shutdown
+// } catch { case e => log.fatal("Unexpected exception: s%",e) }
+// }
diff --git a/kernel/src/main/scala/util/Scheduler.scala b/kernel/src/main/scala/util/Scheduler.scala
index b292039446..95e9378de6 100644
--- a/kernel/src/main/scala/util/Scheduler.scala
+++ b/kernel/src/main/scala/util/Scheduler.scala
@@ -1,7 +1,3 @@
-/**
- * Copyright (C) 2009 Scalable Solutions.
- */
-
package se.scalablesolutions.akka.kernel.util
import java.util.concurrent._
@@ -9,17 +5,14 @@ import kernel.actor.{OneForOneStrategy, Actor}
import org.scala_tools.javautils.Imports._
-case object Schedule
case object UnSchedule
case class SchedulerException(msg: String, e: Throwable) extends RuntimeException(msg, e)
/**
- * Based on David Pollak's ActorPing class in the Lift Project.
- * Licensed under Apache 2 License.
+ * Rework of David Pollak's ActorPing class in the Lift Project
+ * which is licensed under the Apache 2 License.
*/
class ScheduleActor(val receiver: Actor, val future: ScheduledFuture[AnyRef]) extends Actor with Logging {
- receiver ! Schedule
-
def receive: PartialFunction[Any, Unit] = {
case UnSchedule =>
Scheduler.stopSupervising(this)
@@ -33,6 +26,7 @@ object Scheduler extends Actor {
private val schedulers = new ConcurrentHashMap[Actor, Actor]
faultHandler = Some(OneForOneStrategy(5, 5000))
trapExit = true
+ start
def schedule(receiver: Actor, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit) = {
try {
@@ -73,3 +67,5 @@ private object SchedulerThreadFactory extends ThreadFactory {
thread
}
}
+
+
diff --git a/kernel/src/test/scala/AllTest.scala b/kernel/src/test/scala/AllTest.scala
index a225bfb080..546f5cc446 100644
--- a/kernel/src/test/scala/AllTest.scala
+++ b/kernel/src/test/scala/AllTest.scala
@@ -6,6 +6,7 @@ import junit.framework.TestSuite
import kernel.actor.{ActorSpec, RemoteActorSpec, PersistentActorSpec, InMemoryActorSpec}
import kernel.reactor.{EventBasedSingleThreadDispatcherTest, EventBasedThreadPoolDispatcherTest}
+import kernel.util.SchedulerSpec
object AllTest extends TestCase {
def suite(): Test = {
@@ -18,6 +19,8 @@ object AllTest extends TestCase {
suite.addTestSuite(classOf[RemoteActorSpec])
//suite.addTestSuite(classOf[PersistentActorSpec])
suite.addTestSuite(classOf[InMemoryActorSpec])
+ suite.addTestSuite(classOf[SchedulerSpec])
+
//suite.addTestSuite(classOf[TransactionClasherSpec])
suite
}
diff --git a/kernel/src/test/scala/SchedulerSpec.scala b/kernel/src/test/scala/SchedulerSpec.scala
new file mode 100644
index 0000000000..bf763046fc
--- /dev/null
+++ b/kernel/src/test/scala/SchedulerSpec.scala
@@ -0,0 +1,25 @@
+package se.scalablesolutions.akka.kernel.util
+
+import se.scalablesolutions.akka.kernel.actor.Actor
+
+import java.util.concurrent.TimeUnit
+
+import org.junit.Assert._
+
+class SchedulerSpec extends junit.framework.TestCase {
+
+ def testScheduler = {
+ var count = 0
+ case object Tick
+ val actor = new Actor() {
+ def receive: PartialFunction[Any, Unit] = {
+ case Tick => count += 1
+ }}
+ actor.start
+ Thread.sleep(1000)
+ Scheduler.schedule(actor, Tick, 0L, 1L, TimeUnit.SECONDS)
+ Thread.sleep(5000)
+ Scheduler.shutdown
+ assertTrue(count > 0)
+ }
+}
\ No newline at end of file
diff --git a/samples-java/akka-samples-java.iml b/samples-java/akka-samples-java.iml
index 5871ff71d7..243de784f2 100644
--- a/samples-java/akka-samples-java.iml
+++ b/samples-java/akka-samples-java.iml
@@ -9,8 +9,8 @@
-
-
+
+
diff --git a/samples-scala/akka-samples-scala.iml b/samples-scala/akka-samples-scala.iml
index 955c18a9ce..b864b09ba5 100644
--- a/samples-scala/akka-samples-scala.iml
+++ b/samples-scala/akka-samples-scala.iml
@@ -14,8 +14,8 @@
-
-
+
+
diff --git a/util-java/akka-util-java.iml b/util-java/akka-util-java.iml
index 1998394879..646310bd42 100644
--- a/util-java/akka-util-java.iml
+++ b/util-java/akka-util-java.iml
@@ -11,9 +11,8 @@
-
-
+