diff --git a/api-java/src/main/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfigurator.java b/api-java/src/main/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfigurator.java
index 655a84a68f..f7c3ef17d7 100755
--- a/api-java/src/main/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfigurator.java
+++ b/api-java/src/main/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfigurator.java
@@ -1,3 +1,7 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
package com.scalablesolutions.akka.api;
import com.scalablesolutions.akka.kernel.configuration.*;
@@ -13,8 +17,8 @@ import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
-import scala.actors.behavior.Supervisor;
-import scala.actors.behavior.Worker;
+import com.scalablesolutions.akka.supervisor.Supervisor;
+import com.scalablesolutions.akka.supervisor.Worker;
/**
* @author Jonas Bonér
diff --git a/api-java/src/test/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java b/api-java/src/test/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
index ca00180c27..2907b8bfdb 100755
--- a/api-java/src/test/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
+++ b/api-java/src/test/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
@@ -4,12 +4,13 @@
package com.scalablesolutions.akka.api;
+import com.scalablesolutions.akka.annotation.oneway;
+import com.scalablesolutions.akka.kernel.configuration.*;
+
import com.google.inject.Inject;
import com.google.inject.AbstractModule;
import com.google.inject.Scopes;
-import scala.actors.annotation.oneway;
-
import junit.framework.TestCase;
public class ActiveObjectGuiceConfiguratorTest extends TestCase {
@@ -99,9 +100,8 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase {
interface Foo {
public String foo(String msg);
- public
@oneway
- void bar(String msg);
+ public void bar(String msg);
public void longRunning();
diff --git a/config/akka-default.conf b/config/akka-default.conf
old mode 100755
new mode 100644
diff --git a/config/akka.conf.template b/config/akka.conf.template
old mode 100755
new mode 100644
diff --git a/config/scheduler.properties b/config/scheduler.properties
old mode 100755
new mode 100644
diff --git a/kernel/pom.xml b/kernel/pom.xml
index 39a8c83736..c9cf4ec2e4 100755
--- a/kernel/pom.xml
+++ b/kernel/pom.xml
@@ -15,26 +15,16 @@
+
+ ${akka.groupId}
+ akka-supervisor
+ ${akka.version}
+
org.scala-lang
scala-library
${scala.version}
-
- org.scala-libs
- scala-otp-behavior
- 0.1-SNAPSHOT
-
-
- org.scala-libs
- scala-otp-component
- 0.1-SNAPSHOT
-
-
- org.scala-libs
- scala-otp-util-java
- 0.1-SNAPSHOT
-
net.lag
configgy
@@ -70,6 +60,36 @@
jersey-atom
1.0.1
+
+ voldemort
+ voldemort
+ 0.4a
+
+
+ voldemort
+ voldemort-contrib
+ 0.4a
+
+
+ org.apache
+ zookeeper
+ 3.1.0
+
+
+ org.slf4j
+ slf4j-log4j12
+ 1.4.3
+
+
+ org.slf4j
+ slf4j-api
+ 1.4.3
+
+
+ log4j
+ log4j
+ 1.2.13
+
org.specs
specs
diff --git a/kernel/src/main/scala/ActiveObject.scala b/kernel/src/main/scala/ActiveObject.scala
index 53a3f3818c..2726a5b8dc 100755
--- a/kernel/src/main/scala/ActiveObject.scala
+++ b/kernel/src/main/scala/ActiveObject.scala
@@ -4,7 +4,7 @@
package com.scalablesolutions.akka.kernel
-import scala.actors.behavior._
+import com.scalablesolutions.akka.supervisor._
import java.util.{List => JList, ArrayList}
@@ -24,7 +24,6 @@ class ActiveObjectFactory {
ActiveObject.supervise(restartStrategy, components.toArray.toList.asInstanceOf[List[Worker]])
}
-
/**
* @author Jonas Bonér
*/
@@ -48,7 +47,7 @@ object ActiveObject {
override def getSupervisorConfig = SupervisorConfig(restartStrategy, components)
}
val supervisor = factory.newSupervisor
- supervisor ! scala.actors.behavior.Start
+ supervisor ! com.scalablesolutions.akka.supervisor.Start
supervisor
}
@@ -65,7 +64,7 @@ object ActiveObject {
* @author Jonas Bonér
*/
class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: Int) extends InvocationHandler {
- private val oneway = classOf[scala.actors.annotation.oneway]
+ private val oneway = classOf[com.scalablesolutions.akka.annotation.oneway]
private var targetInstance: AnyRef = _
private[akka] def setTargetInstance(instance: AnyRef) = targetInstance = instance
@@ -73,7 +72,6 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
override def body: PartialFunction[Any, Unit] = {
case invocation: Invocation =>
try {
- println("==========> " + invocation)
reply(ErrRef(invocation.invoke))
} catch {
case e: InvocationTargetException =>
@@ -133,7 +131,10 @@ case class Invocation(val method: Method, val args: Array[Object], val target: A
private def isEqual(a1: Array[Object], a2: Array[Object]): Boolean =
(a1 == null && a2 == null) ||
- (a1 != null && a2 != null && a1.size == a2.size && a1.zip(a2).find(t => t._1 == t._2).isDefined)
+ (a1 != null &&
+ a2 != null &&
+ a1.size == a2.size &&
+ a1.zip(a2).find(t => t._1 == t._2).isDefined)
- private def argsToString(array: Array[Object]): String = array.foldLeft("(")(_ + " " + _) + ")"
+ private def argsToString(array: Array[Object]): String = synchronized { array.foldLeft("(")(_ + " " + _) + ")" }
}
diff --git a/kernel/src/main/scala/Configuration.scala b/kernel/src/main/scala/Configuration.scala
index bddb7940bf..dfb302e709 100755
--- a/kernel/src/main/scala/Configuration.scala
+++ b/kernel/src/main/scala/Configuration.scala
@@ -6,10 +6,7 @@ package com.scalablesolutions.akka.kernel.configuration
import com.scalablesolutions.akka.kernel.{ActiveObject, ActiveObjectProxy}
import google.inject.{AbstractModule}
-
import java.util.{List => JList, ArrayList}
-
-import scala.actors.behavior._
import scala.reflect.BeanProperty
// ============================================
@@ -18,42 +15,42 @@ import scala.reflect.BeanProperty
sealed abstract class Configuration
class RestartStrategy(@BeanProperty val scheme: FailOverScheme, @BeanProperty val maxNrOfRetries: Int, @BeanProperty val withinTimeRange: Int) extends Configuration {
- def transform = scala.actors.behavior.RestartStrategy(scheme.transform, maxNrOfRetries, withinTimeRange)
+ def transform = com.scalablesolutions.akka.supervisor.RestartStrategy(scheme.transform, maxNrOfRetries, withinTimeRange)
}
class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val shutdownTime: Int) extends Configuration {
- def transform = scala.actors.behavior.LifeCycle(scope.transform, shutdownTime)
+ def transform = com.scalablesolutions.akka.supervisor.LifeCycle(scope.transform, shutdownTime)
}
abstract class Scope extends Configuration {
- def transform: scala.actors.behavior.Scope
+ def transform: com.scalablesolutions.akka.supervisor.Scope
}
class Permanent extends Scope {
- override def transform = scala.actors.behavior.Permanent
+ override def transform = com.scalablesolutions.akka.supervisor.Permanent
}
class Transient extends Scope {
- override def transform = scala.actors.behavior.Transient
+ override def transform = com.scalablesolutions.akka.supervisor.Transient
}
class Temporary extends Scope {
- override def transform = scala.actors.behavior.Temporary
+ override def transform = com.scalablesolutions.akka.supervisor.Temporary
}
abstract class FailOverScheme extends Configuration {
- def transform: scala.actors.behavior.FailOverScheme
+ def transform: com.scalablesolutions.akka.supervisor.FailOverScheme
}
class AllForOne extends FailOverScheme {
- override def transform = scala.actors.behavior.AllForOne
+ override def transform = com.scalablesolutions.akka.supervisor.AllForOne
}
class OneForOne extends FailOverScheme {
- override def transform = scala.actors.behavior.OneForOne
+ override def transform = com.scalablesolutions.akka.supervisor.OneForOne
}
abstract class Server extends Configuration
//class SupervisorConfig(@BeanProperty val restartStrategy: RestartStrategy, @BeanProperty val servers: JList[Server]) extends Server {
-// def transform = scala.actors.behavior.SupervisorConfig(restartStrategy.transform, servers.toArray.toList.asInstanceOf[List[Server]].map(_.transform))
+// def transform = com.scalablesolutions.akka.supervisor.SupervisorConfig(restartStrategy.transform, servers.toArray.toList.asInstanceOf[List[Server]].map(_.transform))
//}
class Component(@BeanProperty val intf: Class[_],
@BeanProperty val target: Class[_],
@BeanProperty val lifeCycle: LifeCycle,
@BeanProperty val timeout: Int) extends Server {
- def newWorker(proxy: ActiveObjectProxy) = scala.actors.behavior.Worker(proxy.server, lifeCycle.transform)
+ def newWorker(proxy: ActiveObjectProxy) = com.scalablesolutions.akka.supervisor.Worker(proxy.server, lifeCycle.transform)
}
diff --git a/kernel/src/main/scala/Kernel.scala b/kernel/src/main/scala/Kernel.scala
index 6e27111c61..6c373a3a1e 100755
--- a/kernel/src/main/scala/Kernel.scala
+++ b/kernel/src/main/scala/Kernel.scala
@@ -4,25 +4,128 @@
package com.scalablesolutions.akka.kernel
+import org.apache.zookeeper.jmx.ManagedUtil
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog
+import org.apache.zookeeper.server.ServerConfig
+import org.apache.zookeeper.server.NIOServerCnxn
+
+import voldemort.client.{SocketStoreClientFactory, StoreClient, StoreClientFactory}
+import voldemort.server.{VoldemortConfig, VoldemortServer}
+import voldemort.versioning.Versioned
+
import com.sun.grizzly.http.SelectorThread
import com.sun.jersey.api.container.grizzly.GrizzlyWebContainerFactory
+
import java.io.IOException
import java.net.URI
import java.util.{Map, HashMap}
+import java.io.{File, IOException}
+
import javax.ws.rs.core.UriBuilder
import javax.ws.rs.{Produces, Path, GET}
+import javax.management.JMException
+/**
+ * @author Jonas Bonér
+ */
object Kernel extends Logging {
- val SERVER_URL = "http://localhost/"
- val SERVER_PORT = 9998
- val BASE_URI = UriBuilder.fromUri(SERVER_URL).port(getPort(SERVER_PORT)).build()
+ val SERVER_URL = "localhost"
+ val HOME = System.getProperty("AKKA_HOME", "..")
- def getPort(defaultPort: Int) = {
+ val JERSEY_SERVER_URL = "http://" + SERVER_URL + "/"
+ val JERSEY_SERVER_PORT = 9998
+ val JERSEY_REST_CLASSES_ROOT_PACKAGE = "com.scalablesolutions.akka.kernel"
+ val JERSEY_BASE_URI = UriBuilder.fromUri(JERSEY_SERVER_URL).port(getPort(JERSEY_SERVER_PORT)).build()
+
+ val VOLDEMORT_SERVER_URL = "tcp://" + SERVER_URL
+ val VOLDEMORT_SERVER_PORT = 6666
+
+ val ZOO_KEEPER_SERVER_URL = SERVER_URL
+ val ZOO_KEEPER_SERVER_PORT = 9898
+
+ def main(args: Array[String]) = {
+ //startZooKeeper
+ startVoldemort
+ //val threadSelector = startJersey
+
+ // TODO: handle shutdown of Jersey in separate thread
+ // TODO: spawn main in new thread an communicate using socket
+ //System.in.read
+ //threadSelector.stopEndpoint
+ }
+
+ private[akka] def startJersey: SelectorThread = {
+ val initParams = new java.util.HashMap[String, String]
+ initParams.put(
+ "com.sun.jersey.config.property.packages",
+ JERSEY_REST_CLASSES_ROOT_PACKAGE)
+ GrizzlyWebContainerFactory.create(JERSEY_BASE_URI, initParams)
+ }
+
+ private[akka] def startVoldemort = {
+ val config = VoldemortConfig.loadFromVoldemortHome(HOME)
+ val server = new VoldemortServer(config)
+ server.start
+ log.info("Replicated persistent storage server started at s%", VOLDEMORT_SERVER_URL + ":" + VOLDEMORT_SERVER_PORT)
+ }
+
+ // private[akka] def startZooKeeper = {
+ // try {
+ // ManagedUtil.registerLog4jMBeans
+ // ServerConfig.parse(args)
+ // } catch {
+ // case e: JMException => log.warning("Unable to register log4j JMX control: s%", e)
+ // case e => log.fatal("Error in ZooKeeper config: s%", e)
+ // }
+ // val factory = new ZooKeeperServer.Factory() {
+ // override def createConnectionFactory = new NIOServerCnxn.Factory(ServerConfig.getClientPort)
+ // override def createServer = {
+ // val server = new ZooKeeperServer
+ // val txLog = new FileTxnSnapLog(
+ // new File(ServerConfig.getDataLogDir),
+ // new File(ServerConfig.getDataDir))
+ // server.setTxnLogFactory(txLog)
+ // server
+ // }
+ // }
+ // try {
+ // val zooKeeper = factory.createServer
+ // zooKeeper.startup
+ // log.info("ZooKeeper started")
+ // // TODO: handle clean shutdown as below in separate thread
+ // // val cnxnFactory = serverFactory.createConnectionFactory
+ // // cnxnFactory.setZooKeeperServer(zooKeeper)
+ // // cnxnFactory.join
+ // // if (zooKeeper.isRunning) zooKeeper.shutdown
+ // } catch { case e => log.fatal("Unexpected exception: s%",e) }
+ // }
+
+ private[akka] def getStorage(storageName: String): StoreClient[String, String] = {
+ //Versioned value = client.get("some_key");
+ //value.setObject("some_value");
+ //client.put("some_key", value);
+ val numThreads = 10
+ val maxQueuedRequests = 10
+ val maxConnectionsPerNode = 10
+ val maxTotalConnections = 100
+ val bootstrapUrl = VOLDEMORT_SERVER_URL + VOLDEMORT_SERVER_PORT
+ val factory = new SocketStoreClientFactory(
+ numThreads,
+ numThreads,
+ maxQueuedRequests,
+ maxConnectionsPerNode,
+ maxTotalConnections,
+ bootstrapUrl)
+ factory.getStoreClient(storageName)
+ }
+
+ private def getPort(defaultPort: Int) = {
val port = System.getenv("JERSEY_HTTP_PORT")
if (null != port) Integer.parseInt(port)
else defaultPort;
}
+}
// @GET
// @Produces("application/json")
@@ -34,19 +137,3 @@ object Kernel extends Logging {
// q.getSingleResult.asInstanceOf[User]
// }
- def startServer: SelectorThread = {
- val initParams = new java.util.HashMap[String, String]
- initParams.put(
- "com.sun.jersey.config.property.packages",
- "com.scalablesolutions.akka.kernel")
- log.info("Starting grizzly...")
- GrizzlyWebContainerFactory.create(BASE_URI, initParams)
- }
-
- def main(args: Array[String]) {
- val threadSelector = startServer
- log.info("Akka kernel started at s%", BASE_URI)
- System.in.read
- threadSelector.stopEndpoint
- }
-}
diff --git a/kernel/src/test/scala/restManagerSpec.scala b/kernel/src/test/scala/restManagerSpec.scala
index ba2c8d41d1..cc02105cca 100755
--- a/kernel/src/test/scala/restManagerSpec.scala
+++ b/kernel/src/test/scala/restManagerSpec.scala
@@ -11,21 +11,18 @@ import javax.ws.rs.{Produces, Path, GET}
/**
* @author Jonas Bonér
*/
+class restManagerSpecTest extends JUnit4(restManagerSpec) // for JUnit4 and Maven
+object restManagerSpec extends Specification {
+
+ "jersey server should be able to start and stop" in {
+ val threadSelector = Kernel.startJersey
+ threadSelector.stopEndpoint
+ }
+}
@Path("/helloworld")
class HelloWorldResource {
@GET
@Produces(Array("text/plain"))
- def getClichedMessage = "Hello World"
+ def getMessage = "Hello World"
}
-
-class restManagerSpecTest extends JUnit4(restManagerSpec) // for JUnit4 and Maven
-object restManagerSpec extends Specification {
-
- "test" in {
- val threadSelector = Kernel.startServer
- val reply = System.in.read
- println("==============> " + reply)
- threadSelector.stopEndpoint
- }
-}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 00aea8fb57..eade95e985 100755
--- a/pom.xml
+++ b/pom.xml
@@ -18,6 +18,8 @@
+ util-java
+ supervisor
kernel
api-java