From 23c104003b442959c977ae424ed48ad0f9a463c8 Mon Sep 17 00:00:00 2001 From: Jonas Boner Date: Tue, 10 Mar 2009 00:56:42 +0100 Subject: [PATCH] changed package imports for supervisor --- .../api/ActiveObjectGuiceConfigurator.java | 8 +- .../ActiveObjectGuiceConfiguratorTest.java | 8 +- config/akka-default.conf | 0 config/akka.conf.template | 0 config/scheduler.properties | 0 kernel/pom.xml | 50 ++++--- kernel/src/main/scala/ActiveObject.scala | 15 ++- kernel/src/main/scala/Configuration.scala | 25 ++-- kernel/src/main/scala/Kernel.scala | 127 +++++++++++++++--- kernel/src/test/scala/restManagerSpec.scala | 21 ++- pom.xml | 2 + 11 files changed, 182 insertions(+), 74 deletions(-) mode change 100755 => 100644 config/akka-default.conf mode change 100755 => 100644 config/akka.conf.template mode change 100755 => 100644 config/scheduler.properties diff --git a/api-java/src/main/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfigurator.java b/api-java/src/main/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfigurator.java index 655a84a68f..f7c3ef17d7 100755 --- a/api-java/src/main/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfigurator.java +++ b/api-java/src/main/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfigurator.java @@ -1,3 +1,7 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + package com.scalablesolutions.akka.api; import com.scalablesolutions.akka.kernel.configuration.*; @@ -13,8 +17,8 @@ import java.util.ArrayList; import java.util.Map; import java.util.HashMap; -import scala.actors.behavior.Supervisor; -import scala.actors.behavior.Worker; +import com.scalablesolutions.akka.supervisor.Supervisor; +import com.scalablesolutions.akka.supervisor.Worker; /** * @author Jonas Bonér diff --git a/api-java/src/test/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java b/api-java/src/test/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java index ca00180c27..2907b8bfdb 100755 --- a/api-java/src/test/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java +++ b/api-java/src/test/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java @@ -4,12 +4,13 @@ package com.scalablesolutions.akka.api; +import com.scalablesolutions.akka.annotation.oneway; +import com.scalablesolutions.akka.kernel.configuration.*; + import com.google.inject.Inject; import com.google.inject.AbstractModule; import com.google.inject.Scopes; -import scala.actors.annotation.oneway; - import junit.framework.TestCase; public class ActiveObjectGuiceConfiguratorTest extends TestCase { @@ -99,9 +100,8 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase { interface Foo { public String foo(String msg); - public @oneway - void bar(String msg); + public void bar(String msg); public void longRunning(); diff --git a/config/akka-default.conf b/config/akka-default.conf old mode 100755 new mode 100644 diff --git a/config/akka.conf.template b/config/akka.conf.template old mode 100755 new mode 100644 diff --git a/config/scheduler.properties b/config/scheduler.properties old mode 100755 new mode 100644 diff --git a/kernel/pom.xml b/kernel/pom.xml index 39a8c83736..c9cf4ec2e4 100755 --- a/kernel/pom.xml +++ b/kernel/pom.xml @@ -15,26 +15,16 @@ + + ${akka.groupId} + akka-supervisor + ${akka.version} + org.scala-lang scala-library ${scala.version} - - org.scala-libs - scala-otp-behavior - 0.1-SNAPSHOT - - - org.scala-libs - scala-otp-component - 0.1-SNAPSHOT - - - org.scala-libs - scala-otp-util-java - 0.1-SNAPSHOT - net.lag configgy @@ -70,6 +60,36 @@ jersey-atom 1.0.1 + + voldemort + voldemort + 0.4a + + + voldemort + voldemort-contrib + 0.4a + + + org.apache + zookeeper + 3.1.0 + + + org.slf4j + slf4j-log4j12 + 1.4.3 + + + org.slf4j + slf4j-api + 1.4.3 + + + log4j + log4j + 1.2.13 + org.specs specs diff --git a/kernel/src/main/scala/ActiveObject.scala b/kernel/src/main/scala/ActiveObject.scala index 53a3f3818c..2726a5b8dc 100755 --- a/kernel/src/main/scala/ActiveObject.scala +++ b/kernel/src/main/scala/ActiveObject.scala @@ -4,7 +4,7 @@ package com.scalablesolutions.akka.kernel -import scala.actors.behavior._ +import com.scalablesolutions.akka.supervisor._ import java.util.{List => JList, ArrayList} @@ -24,7 +24,6 @@ class ActiveObjectFactory { ActiveObject.supervise(restartStrategy, components.toArray.toList.asInstanceOf[List[Worker]]) } - /** * @author Jonas Bonér */ @@ -48,7 +47,7 @@ object ActiveObject { override def getSupervisorConfig = SupervisorConfig(restartStrategy, components) } val supervisor = factory.newSupervisor - supervisor ! scala.actors.behavior.Start + supervisor ! com.scalablesolutions.akka.supervisor.Start supervisor } @@ -65,7 +64,7 @@ object ActiveObject { * @author Jonas Bonér */ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: Int) extends InvocationHandler { - private val oneway = classOf[scala.actors.annotation.oneway] + private val oneway = classOf[com.scalablesolutions.akka.annotation.oneway] private var targetInstance: AnyRef = _ private[akka] def setTargetInstance(instance: AnyRef) = targetInstance = instance @@ -73,7 +72,6 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I override def body: PartialFunction[Any, Unit] = { case invocation: Invocation => try { - println("==========> " + invocation) reply(ErrRef(invocation.invoke)) } catch { case e: InvocationTargetException => @@ -133,7 +131,10 @@ case class Invocation(val method: Method, val args: Array[Object], val target: A private def isEqual(a1: Array[Object], a2: Array[Object]): Boolean = (a1 == null && a2 == null) || - (a1 != null && a2 != null && a1.size == a2.size && a1.zip(a2).find(t => t._1 == t._2).isDefined) + (a1 != null && + a2 != null && + a1.size == a2.size && + a1.zip(a2).find(t => t._1 == t._2).isDefined) - private def argsToString(array: Array[Object]): String = array.foldLeft("(")(_ + " " + _) + ")" + private def argsToString(array: Array[Object]): String = synchronized { array.foldLeft("(")(_ + " " + _) + ")" } } diff --git a/kernel/src/main/scala/Configuration.scala b/kernel/src/main/scala/Configuration.scala index bddb7940bf..dfb302e709 100755 --- a/kernel/src/main/scala/Configuration.scala +++ b/kernel/src/main/scala/Configuration.scala @@ -6,10 +6,7 @@ package com.scalablesolutions.akka.kernel.configuration import com.scalablesolutions.akka.kernel.{ActiveObject, ActiveObjectProxy} import google.inject.{AbstractModule} - import java.util.{List => JList, ArrayList} - -import scala.actors.behavior._ import scala.reflect.BeanProperty // ============================================ @@ -18,42 +15,42 @@ import scala.reflect.BeanProperty sealed abstract class Configuration class RestartStrategy(@BeanProperty val scheme: FailOverScheme, @BeanProperty val maxNrOfRetries: Int, @BeanProperty val withinTimeRange: Int) extends Configuration { - def transform = scala.actors.behavior.RestartStrategy(scheme.transform, maxNrOfRetries, withinTimeRange) + def transform = com.scalablesolutions.akka.supervisor.RestartStrategy(scheme.transform, maxNrOfRetries, withinTimeRange) } class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val shutdownTime: Int) extends Configuration { - def transform = scala.actors.behavior.LifeCycle(scope.transform, shutdownTime) + def transform = com.scalablesolutions.akka.supervisor.LifeCycle(scope.transform, shutdownTime) } abstract class Scope extends Configuration { - def transform: scala.actors.behavior.Scope + def transform: com.scalablesolutions.akka.supervisor.Scope } class Permanent extends Scope { - override def transform = scala.actors.behavior.Permanent + override def transform = com.scalablesolutions.akka.supervisor.Permanent } class Transient extends Scope { - override def transform = scala.actors.behavior.Transient + override def transform = com.scalablesolutions.akka.supervisor.Transient } class Temporary extends Scope { - override def transform = scala.actors.behavior.Temporary + override def transform = com.scalablesolutions.akka.supervisor.Temporary } abstract class FailOverScheme extends Configuration { - def transform: scala.actors.behavior.FailOverScheme + def transform: com.scalablesolutions.akka.supervisor.FailOverScheme } class AllForOne extends FailOverScheme { - override def transform = scala.actors.behavior.AllForOne + override def transform = com.scalablesolutions.akka.supervisor.AllForOne } class OneForOne extends FailOverScheme { - override def transform = scala.actors.behavior.OneForOne + override def transform = com.scalablesolutions.akka.supervisor.OneForOne } abstract class Server extends Configuration //class SupervisorConfig(@BeanProperty val restartStrategy: RestartStrategy, @BeanProperty val servers: JList[Server]) extends Server { -// def transform = scala.actors.behavior.SupervisorConfig(restartStrategy.transform, servers.toArray.toList.asInstanceOf[List[Server]].map(_.transform)) +// def transform = com.scalablesolutions.akka.supervisor.SupervisorConfig(restartStrategy.transform, servers.toArray.toList.asInstanceOf[List[Server]].map(_.transform)) //} class Component(@BeanProperty val intf: Class[_], @BeanProperty val target: Class[_], @BeanProperty val lifeCycle: LifeCycle, @BeanProperty val timeout: Int) extends Server { - def newWorker(proxy: ActiveObjectProxy) = scala.actors.behavior.Worker(proxy.server, lifeCycle.transform) + def newWorker(proxy: ActiveObjectProxy) = com.scalablesolutions.akka.supervisor.Worker(proxy.server, lifeCycle.transform) } diff --git a/kernel/src/main/scala/Kernel.scala b/kernel/src/main/scala/Kernel.scala index 6e27111c61..6c373a3a1e 100755 --- a/kernel/src/main/scala/Kernel.scala +++ b/kernel/src/main/scala/Kernel.scala @@ -4,25 +4,128 @@ package com.scalablesolutions.akka.kernel +import org.apache.zookeeper.jmx.ManagedUtil +import org.apache.zookeeper.server.persistence.FileTxnSnapLog +import org.apache.zookeeper.server.ServerConfig +import org.apache.zookeeper.server.NIOServerCnxn + +import voldemort.client.{SocketStoreClientFactory, StoreClient, StoreClientFactory} +import voldemort.server.{VoldemortConfig, VoldemortServer} +import voldemort.versioning.Versioned + import com.sun.grizzly.http.SelectorThread import com.sun.jersey.api.container.grizzly.GrizzlyWebContainerFactory + import java.io.IOException import java.net.URI import java.util.{Map, HashMap} +import java.io.{File, IOException} + import javax.ws.rs.core.UriBuilder import javax.ws.rs.{Produces, Path, GET} +import javax.management.JMException +/** + * @author Jonas Bonér + */ object Kernel extends Logging { - val SERVER_URL = "http://localhost/" - val SERVER_PORT = 9998 - val BASE_URI = UriBuilder.fromUri(SERVER_URL).port(getPort(SERVER_PORT)).build() + val SERVER_URL = "localhost" + val HOME = System.getProperty("AKKA_HOME", "..") - def getPort(defaultPort: Int) = { + val JERSEY_SERVER_URL = "http://" + SERVER_URL + "/" + val JERSEY_SERVER_PORT = 9998 + val JERSEY_REST_CLASSES_ROOT_PACKAGE = "com.scalablesolutions.akka.kernel" + val JERSEY_BASE_URI = UriBuilder.fromUri(JERSEY_SERVER_URL).port(getPort(JERSEY_SERVER_PORT)).build() + + val VOLDEMORT_SERVER_URL = "tcp://" + SERVER_URL + val VOLDEMORT_SERVER_PORT = 6666 + + val ZOO_KEEPER_SERVER_URL = SERVER_URL + val ZOO_KEEPER_SERVER_PORT = 9898 + + def main(args: Array[String]) = { + //startZooKeeper + startVoldemort + //val threadSelector = startJersey + + // TODO: handle shutdown of Jersey in separate thread + // TODO: spawn main in new thread an communicate using socket + //System.in.read + //threadSelector.stopEndpoint + } + + private[akka] def startJersey: SelectorThread = { + val initParams = new java.util.HashMap[String, String] + initParams.put( + "com.sun.jersey.config.property.packages", + JERSEY_REST_CLASSES_ROOT_PACKAGE) + GrizzlyWebContainerFactory.create(JERSEY_BASE_URI, initParams) + } + + private[akka] def startVoldemort = { + val config = VoldemortConfig.loadFromVoldemortHome(HOME) + val server = new VoldemortServer(config) + server.start + log.info("Replicated persistent storage server started at s%", VOLDEMORT_SERVER_URL + ":" + VOLDEMORT_SERVER_PORT) + } + + // private[akka] def startZooKeeper = { + // try { + // ManagedUtil.registerLog4jMBeans + // ServerConfig.parse(args) + // } catch { + // case e: JMException => log.warning("Unable to register log4j JMX control: s%", e) + // case e => log.fatal("Error in ZooKeeper config: s%", e) + // } + // val factory = new ZooKeeperServer.Factory() { + // override def createConnectionFactory = new NIOServerCnxn.Factory(ServerConfig.getClientPort) + // override def createServer = { + // val server = new ZooKeeperServer + // val txLog = new FileTxnSnapLog( + // new File(ServerConfig.getDataLogDir), + // new File(ServerConfig.getDataDir)) + // server.setTxnLogFactory(txLog) + // server + // } + // } + // try { + // val zooKeeper = factory.createServer + // zooKeeper.startup + // log.info("ZooKeeper started") + // // TODO: handle clean shutdown as below in separate thread + // // val cnxnFactory = serverFactory.createConnectionFactory + // // cnxnFactory.setZooKeeperServer(zooKeeper) + // // cnxnFactory.join + // // if (zooKeeper.isRunning) zooKeeper.shutdown + // } catch { case e => log.fatal("Unexpected exception: s%",e) } + // } + + private[akka] def getStorage(storageName: String): StoreClient[String, String] = { + //Versioned value = client.get("some_key"); + //value.setObject("some_value"); + //client.put("some_key", value); + val numThreads = 10 + val maxQueuedRequests = 10 + val maxConnectionsPerNode = 10 + val maxTotalConnections = 100 + val bootstrapUrl = VOLDEMORT_SERVER_URL + VOLDEMORT_SERVER_PORT + val factory = new SocketStoreClientFactory( + numThreads, + numThreads, + maxQueuedRequests, + maxConnectionsPerNode, + maxTotalConnections, + bootstrapUrl) + factory.getStoreClient(storageName) + } + + private def getPort(defaultPort: Int) = { val port = System.getenv("JERSEY_HTTP_PORT") if (null != port) Integer.parseInt(port) else defaultPort; } +} // @GET // @Produces("application/json") @@ -34,19 +137,3 @@ object Kernel extends Logging { // q.getSingleResult.asInstanceOf[User] // } - def startServer: SelectorThread = { - val initParams = new java.util.HashMap[String, String] - initParams.put( - "com.sun.jersey.config.property.packages", - "com.scalablesolutions.akka.kernel") - log.info("Starting grizzly...") - GrizzlyWebContainerFactory.create(BASE_URI, initParams) - } - - def main(args: Array[String]) { - val threadSelector = startServer - log.info("Akka kernel started at s%", BASE_URI) - System.in.read - threadSelector.stopEndpoint - } -} diff --git a/kernel/src/test/scala/restManagerSpec.scala b/kernel/src/test/scala/restManagerSpec.scala index ba2c8d41d1..cc02105cca 100755 --- a/kernel/src/test/scala/restManagerSpec.scala +++ b/kernel/src/test/scala/restManagerSpec.scala @@ -11,21 +11,18 @@ import javax.ws.rs.{Produces, Path, GET} /** * @author Jonas Bonér */ +class restManagerSpecTest extends JUnit4(restManagerSpec) // for JUnit4 and Maven +object restManagerSpec extends Specification { + + "jersey server should be able to start and stop" in { + val threadSelector = Kernel.startJersey + threadSelector.stopEndpoint + } +} @Path("/helloworld") class HelloWorldResource { @GET @Produces(Array("text/plain")) - def getClichedMessage = "Hello World" + def getMessage = "Hello World" } - -class restManagerSpecTest extends JUnit4(restManagerSpec) // for JUnit4 and Maven -object restManagerSpec extends Specification { - - "test" in { - val threadSelector = Kernel.startServer - val reply = System.in.read - println("==============> " + reply) - threadSelector.stopEndpoint - } -} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 00aea8fb57..eade95e985 100755 --- a/pom.xml +++ b/pom.xml @@ -18,6 +18,8 @@ + util-java + supervisor kernel api-java