diff --git a/api-java/src/main/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfigurator.java b/api-java/src/main/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfigurator.java deleted file mode 100755 index f7c3ef17d7..0000000000 --- a/api-java/src/main/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfigurator.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Copyright (C) 2009 Scalable Solutions. - */ - -package com.scalablesolutions.akka.api; - -import com.scalablesolutions.akka.kernel.configuration.*; - -import com.google.inject.*; -import com.google.inject.jsr250.ResourceProviderFactory; - -import com.scalablesolutions.akka.kernel.ActiveObjectFactory; -import com.scalablesolutions.akka.kernel.ActiveObjectProxy; - -import java.util.List; -import java.util.ArrayList; -import java.util.Map; -import java.util.HashMap; - -import com.scalablesolutions.akka.supervisor.Supervisor; -import com.scalablesolutions.akka.supervisor.Worker; - -/** - * @author Jonas Bonér - */ -public class ActiveObjectGuiceConfigurator { - private List modules = new ArrayList(); - private Injector injector; - private Supervisor supervisor; - private RestartStrategy restartStrategy; - private Component[] components; - private Map configRegistry = new HashMap(); - private Map activeObjectRegistry = new HashMap(); - private ActiveObjectFactory activeObjectFactory = new ActiveObjectFactory(); - - public synchronized T getExternalDependency(Class clazz) { - return injector.getInstance(clazz); - } - - /** - * Returns the active abject that has been put under supervision for the class specified. - * - * @param clazz the class for the active object - * @return the active object for the class - */ - public synchronized T getActiveObject(Class clazz) { - if (injector == null) throw new IllegalStateException("inject() and supervise() must be called before invoking newInstance(clazz)"); - if (activeObjectRegistry.containsKey(clazz)) { - final ActiveObjectProxy activeObjectProxy = activeObjectRegistry.get(clazz); - activeObjectProxy.setTargetInstance(injector.getInstance(clazz)); - return (T)activeObjectFactory.newInstance(clazz, activeObjectProxy); - } else throw new IllegalStateException("Class " + clazz.getName() + " has not been put under supervision (by passing in the config to the supervise() method"); - } - - public synchronized ActiveObjectGuiceConfigurator configureActiveObjects(final RestartStrategy restartStrategy, final Component[] components) { - this.restartStrategy = restartStrategy; - this.components = components; - modules.add(new AbstractModule() { - protected void configure() { - bind(ResourceProviderFactory.class); - for (int i = 0; i < components.length; i++) { - Component c = components[i]; - bind((Class) c.intf()).to((Class) c.target()).in(Singleton.class); - } - } - }); - return this; - } - - public synchronized ActiveObjectGuiceConfigurator inject() { - if (injector != null) throw new IllegalStateException("inject() has already been called on this configurator"); - injector = Guice.createInjector(modules); - return this; - } - - public synchronized ActiveObjectGuiceConfigurator supervise() { - if (injector == null) inject(); - injector = Guice.createInjector(modules); - List workers = new ArrayList(); - for (int i = 0; i < components.length; i++) { - final Component c = components[i]; - final ActiveObjectProxy activeObjectProxy = new ActiveObjectProxy(c.intf(), c.target(), c.timeout()); - workers.add(c.newWorker(activeObjectProxy)); - activeObjectRegistry.put(c.intf(), activeObjectProxy); - } - supervisor = activeObjectFactory.supervise(restartStrategy.transform(), workers); - return this; - } - - - /** - * Add additional services to be wired in. - *
-   * ActiveObjectGuiceConfigurator.addExternalGuiceModule(new AbstractModule {
-   *   protected void configure() {
-   *     bind(Foo.class).to(FooImpl.class).in(Scopes.SINGLETON);
-   *     bind(BarImpl.class);
-   *     link(Bar.class).to(BarImpl.class);
-   *     bindConstant(named("port")).to(8080);
-   *   }})
-   * 
- */ - public synchronized ActiveObjectGuiceConfigurator addExternalGuiceModule(Module module) { - modules.add(module); - return this; - } - - public List getGuiceModules() { - return modules; - } - - public synchronized void reset() { - modules = new ArrayList(); - configRegistry = new HashMap(); - activeObjectRegistry = new HashMap(); - injector = null; - restartStrategy = null; - } - - public synchronized void stop() { - // TODO: fix supervisor.stop(); - } -} diff --git a/api-java/src/test/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java b/api-java/src/test/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java deleted file mode 100755 index 2907b8bfdb..0000000000 --- a/api-java/src/test/java/com/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java +++ /dev/null @@ -1,169 +0,0 @@ -/** - * Copyright (C) 2009 Scalable Solutions. - */ - -package com.scalablesolutions.akka.api; - -import com.scalablesolutions.akka.annotation.oneway; -import com.scalablesolutions.akka.kernel.configuration.*; - -import com.google.inject.Inject; -import com.google.inject.AbstractModule; -import com.google.inject.Scopes; - -import junit.framework.TestCase; - -public class ActiveObjectGuiceConfiguratorTest extends TestCase { - static String messageLog = ""; - - final private ActiveObjectGuiceConfigurator conf = new ActiveObjectGuiceConfigurator(); - - protected void setUp() { - conf.addExternalGuiceModule(new AbstractModule() { - protected void configure() { - bind(Ext.class).to(ExtImpl.class).in(Scopes.SINGLETON); - } - }).configureActiveObjects( - new RestartStrategy(new AllForOne(), 3, 100), new Component[]{ - new Component( - Foo.class, - FooImpl.class, - new LifeCycle(new Permanent(), 100), - 1000), - new Component( - Bar.class, - BarImpl.class, - new LifeCycle(new Permanent(), 100), - 1000) - }).inject().supervise(); - - } - - public void testGuiceActiveObjectInjection() { - messageLog = ""; - Foo foo = conf.getActiveObject(Foo.class); - Bar bar = conf.getActiveObject(Bar.class); - assertTrue(foo.getBar().toString().equals(bar.toString())); - } - - public void testGuiceExternalDependencyInjection() { - messageLog = ""; - Bar bar = conf.getActiveObject(Bar.class); - Ext ext = conf.getExternalDependency(Ext.class); - assertTrue(bar.getExt().toString().equals(ext.toString())); - } - - public void testActiveObjectInvocation() throws InterruptedException { - messageLog = ""; - Foo foo = conf.getActiveObject(Foo.class); - messageLog += foo.foo("foo "); - foo.bar("bar "); - messageLog += "before_bar "; - Thread.sleep(500); - assertEquals("foo return_foo before_bar ", messageLog); - } - - public void testActiveObjectInvocationsInvocation() throws InterruptedException { - messageLog = ""; - Foo foo = conf.getActiveObject(Foo.class); - Bar bar = conf.getActiveObject(Bar.class); - messageLog += foo.foo("foo "); - foo.bar("bar "); - messageLog += "before_bar "; - Thread.sleep(500); - assertEquals("foo return_foo before_bar ", messageLog); - } - - public void testForcedTimeout() { - messageLog = ""; - Foo foo = conf.getActiveObject(Foo.class); - try { - foo.longRunning(); - fail("exception should have been thrown"); - } catch (com.scalablesolutions.akka.kernel.ActiveObjectInvocationTimeoutException e) { - } - } - - public void testForcedException() { - messageLog = ""; - Foo foo = conf.getActiveObject(Foo.class); - try { - foo.throwsException(); - fail("exception should have been thrown"); - } catch (RuntimeException e) { - } - } -} - -// ============== TEST SERVICES =============== - -interface Foo { - public String foo(String msg); - - @oneway - public void bar(String msg); - - public void longRunning(); - - public void throwsException(); - - public Bar getBar(); -} - -class FooImpl implements Foo { - @Inject - private Bar bar; - - public Bar getBar() { - return bar; - } - - public String foo(String msg) { - return msg + "return_foo "; - } - - public void bar(String msg) { - bar.bar(msg); - } - - public void longRunning() { - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - } - } - - public void throwsException() { - throw new RuntimeException("expected"); - } -} - -interface Bar { - @oneway - void bar(String msg); - - Ext getExt(); -} - -class BarImpl implements Bar { - @Inject - private Ext ext; - - public Ext getExt() { - return ext; - } - - public void bar(String msg) { - } -} - -interface Ext { - void ext(); -} - -class ExtImpl implements Ext { - public void ext() { - } -} - - \ No newline at end of file diff --git a/buildfile b/buildfile index 3e7a26b4d3..1ac7ddeeea 100644 --- a/buildfile +++ b/buildfile @@ -12,7 +12,7 @@ repositories.remote << 'http://scala-tools.org/repo-snapshots' repositories.remote << 'http://www.lag.net/repo' AKKA_KERNEL = 'se.scalablesolutions.akka:akka-kernel:jar:0.1' -AKKA_SUPERVISOR = 'se.scalablesolutions.akka:akka-supervisor:jar:0.1' +#AKKA_SUPERVISOR = 'se.scalablesolutions.akka:akka-supervisor:jar:0.1' AKKA_UTIL_JAVA = 'se.scalablesolutions.akka:akka-util-java:jar:0.1' AKKA_API_JAVA = 'se.scalablesolutions.akka:akka-api-java:jar:0.1' @@ -60,7 +60,7 @@ define 'akka' do desc 'Akka Actor kernel core implementation' define 'kernel' do - compile.with(AKKA_SUPERVISOR, AKKA_UTIL_JAVA, GUICEYFRUIT, MINA_CORE, MINA_SCALA, JERSEY, VOLDEMORT, ZOOKEEPER, SLF4J, GRIZZLY, CONFIGGY, JUNIT4) + compile.with(AKKA_UTIL_JAVA, GUICEYFRUIT, MINA_CORE, MINA_SCALA, JERSEY, VOLDEMORT, ZOOKEEPER, SLF4J, GRIZZLY, CONFIGGY, JUNIT4) test.using :specs package :jar end @@ -74,7 +74,7 @@ define 'akka' do #desc 'Akka Java API' #define 'api-java' do - # compile.with(AKKA_KERNEL, AKKA_SUPERVISOR, AKKA_UTIL_JAVA, GUICEYFRUIT, JUNIT4) + # compile.with(AKKA_KERNEL, AKKA_UTIL_JAVA, GUICEYFRUIT, JUNIT4) # package :jar #end diff --git a/kernel/src/main/scala/ActiveObject.scala b/kernel/src/main/scala/ActiveObject.scala index 03a5efc234..35d9f9e837 100755 --- a/kernel/src/main/scala/ActiveObject.scala +++ b/kernel/src/main/scala/ActiveObject.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009 Scalable Solutions. */ -package com.scalablesolutions.akka.kernel +package se.scalablesolutions.akka.kernel import java.util.{List => JList, ArrayList} @@ -29,6 +29,12 @@ class ActiveObjectFactory { * @author Jonas Bonér */ object ActiveObject { + private[kernel] val threadBoundTx: ThreadLocal[Option[Transaction]] = { + val tl = new ThreadLocal[Option[Transaction]] + tl.set(None) + tl + } + def newInstance[T](intf: Class[_], proxy: ActiveObjectProxy): T = { Proxy.newProxyInstance( intf.getClassLoader, @@ -44,11 +50,11 @@ object ActiveObject { } def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor = { - object factory extends SupervisorFactory { + object factory extends SupervisorFactory { override def getSupervisorConfig = SupervisorConfig(restartStrategy, components) } val supervisor = factory.newSupervisor - supervisor ! com.scalablesolutions.akka.kernel.Start + supervisor ! se.scalablesolutions.akka.kernel.Start supervisor } @@ -65,46 +71,72 @@ object ActiveObject { * @author Jonas Bonér */ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: Int) extends InvocationHandler { - private val oneway = classOf[com.scalablesolutions.akka.annotation.oneway] - private var targetInstance: AnyRef = _ - private[akka] def setTargetInstance(instance: AnyRef) = targetInstance = instance + val oneway = classOf[se.scalablesolutions.akka.annotation.oneway] - private[ActiveObjectProxy] object dispatcher extends GenericServer { + private[this] var activeTx: Option[Transaction] = None + + private var targetInstance: AnyRef = _ + private[kernel] def setTargetInstance(instance: AnyRef) = targetInstance = instance + + private[this] val dispatcher = new GenericServer { override def body: PartialFunction[Any, Unit] = { case invocation: Invocation => + val tx = invocation.tx try { - reply(ErrRef(invocation.invoke)) + reply(ErrRef(invocation.invoke, tx)) } catch { case e: InvocationTargetException => val te = e.getTargetException te.printStackTrace - reply(ErrRef({ throw te })) + reply(ErrRef({ throw te }, tx)) case e => e.printStackTrace - reply(ErrRef({ throw e })) + reply(ErrRef({ throw e }, tx)) } case 'exit => exit; reply() case unexpected => throw new ActiveObjectException("Unexpected message to actor proxy: " + unexpected) } } - private[akka] val server = new GenericServerContainer(target.getName, () => dispatcher) + private[kernel] val server = new GenericServerContainer(target.getName, () => dispatcher) server.setTimeout(timeout) - def invoke(proxy: AnyRef, m: Method, args: Array[AnyRef]): AnyRef = - invoke(Invocation(m, args, targetInstance)) + def invoke(proxy: AnyRef, m: Method, args: Array[AnyRef]): AnyRef = { + val cflowTx = ActiveObject.threadBoundTx.get + activeTx.get.asInstanceOf[Option[Transaction]] match { + case Some(tx) => + if (cflowTx.isDefined && cflowTx.get != tx) { + // new tx in scope; try to commit + tx.commit(server) + activeTx = None + } + case None => + if (cflowTx.isDefined) activeTx = Some(cflowTx.get) + } + invoke(Invocation(m, args, targetInstance, activeTx)) + } - def invoke(invocation: Invocation): AnyRef = { + private def invoke(invocation: Invocation): AnyRef = { val result: AnyRef = if (invocation.method.isAnnotationPresent(oneway)) server ! invocation else { - val transaction = _ - val result: ErrRef[AnyRef] = server !!! (invocation, ErrRef({ - throw new ActiveObjectInvocationTimeoutException( - "proxy invocation timed out after " + timeout + " milliseconds") - })) - result() + val result: ErrRef[AnyRef] = server !!! (invocation, ErrRef({ + throw new ActiveObjectInvocationTimeoutException("Invocation to active object [" + targetInstance.getClass.getName + "] timed out after " + timeout + " milliseconds") + }, activeTx)) + try { + result() + } catch { + case e => + result.tx match { + case None => // no tx; nothing to do + case Some(tx) => + tx.rollback(server) + ActiveObject.threadBoundTx.set(Some(tx)) + } + throw e + } } + if (activeTx.isDefined) activeTx.get.precommit(server) result } } @@ -114,7 +146,10 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I * * @author Jonas Bonér */ -case class Invocation(val method: Method, val args: Array[Object], val target: AnyRef) { +case class Invocation(val method: Method, + val args: Array[Object], + val target: AnyRef, + val tx: Option[Transaction]) { method.setAccessible(true) def invoke: AnyRef = method.invoke(target, args:_*) @@ -145,7 +180,7 @@ case class Invocation(val method: Method, val args: Array[Object], val target: A a1.size == a2.size && a1.zip(a2).find(t => t._1 == t._2).isDefined) - private def argsToString(array: Array[Object]): String = synchronized { + private def argsToString(array: Array[Object]): String = synchronized { array.foldLeft("(")(_ + " " + _) + ")" } } diff --git a/kernel/src/main/scala/Boot.scala b/kernel/src/main/scala/Boot.scala index f43276d199..147910a2c6 100644 --- a/kernel/src/main/scala/Boot.scala +++ b/kernel/src/main/scala/Boot.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009 Scalable Solutions. */ -package com.scalablesolutions.akka +package se.scalablesolutions.akka import kernel.Logging import kernel.configuration.ConfigurationException diff --git a/kernel/src/main/scala/Configuration.scala b/kernel/src/main/scala/Configuration.scala index 11b1dffe8e..1956c13af3 100755 --- a/kernel/src/main/scala/Configuration.scala +++ b/kernel/src/main/scala/Configuration.scala @@ -2,9 +2,9 @@ * Copyright (C) 2009 Scalable Solutions. */ -package com.scalablesolutions.akka.kernel.configuration +package se.scalablesolutions.akka.kernel.configuration -import com.scalablesolutions.akka.kernel.{ActiveObject, ActiveObjectProxy} +import se.scalablesolutions.akka.kernel.{ActiveObject, ActiveObjectProxy} import com.google.inject.{AbstractModule} import java.util.{List => JList, ArrayList} import scala.reflect.BeanProperty @@ -17,42 +17,42 @@ sealed class ConfigurationException(msg: String) extends RuntimeException(msg) sealed abstract class Configuration class RestartStrategy(@BeanProperty val scheme: FailOverScheme, @BeanProperty val maxNrOfRetries: Int, @BeanProperty val withinTimeRange: Int) extends Configuration { - def transform = com.scalablesolutions.akka.kernel.RestartStrategy(scheme.transform, maxNrOfRetries, withinTimeRange) + def transform = se.scalablesolutions.akka.kernel.RestartStrategy(scheme.transform, maxNrOfRetries, withinTimeRange) } class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val shutdownTime: Int) extends Configuration { - def transform = com.scalablesolutions.akka.kernel.LifeCycle(scope.transform, shutdownTime) + def transform = se.scalablesolutions.akka.kernel.LifeCycle(scope.transform, shutdownTime) } abstract class Scope extends Configuration { - def transform: com.scalablesolutions.akka.kernel.Scope + def transform: se.scalablesolutions.akka.kernel.Scope } class Permanent extends Scope { - override def transform = com.scalablesolutions.akka.kernel.Permanent + override def transform = se.scalablesolutions.akka.kernel.Permanent } class Transient extends Scope { - override def transform = com.scalablesolutions.akka.kernel.Transient + override def transform = se.scalablesolutions.akka.kernel.Transient } class Temporary extends Scope { - override def transform = com.scalablesolutions.akka.kernel.Temporary + override def transform = se.scalablesolutions.akka.kernel.Temporary } abstract class FailOverScheme extends Configuration { - def transform: com.scalablesolutions.akka.kernel.FailOverScheme + def transform: se.scalablesolutions.akka.kernel.FailOverScheme } class AllForOne extends FailOverScheme { - override def transform = com.scalablesolutions.akka.kernel.AllForOne + override def transform = se.scalablesolutions.akka.kernel.AllForOne } class OneForOne extends FailOverScheme { - override def transform = com.scalablesolutions.akka.kernel.OneForOne + override def transform = se.scalablesolutions.akka.kernel.OneForOne } abstract class Server extends Configuration //class kernelConfig(@BeanProperty val restartStrategy: RestartStrategy, @BeanProperty val servers: JList[Server]) extends Server { -// def transform = com.scalablesolutions.akka.kernel.kernelConfig(restartStrategy.transform, servers.toArray.toList.asInstanceOf[List[Server]].map(_.transform)) +// def transform = se.scalablesolutions.akka.kernel.kernelConfig(restartStrategy.transform, servers.toArray.toList.asInstanceOf[List[Server]].map(_.transform)) //} class Component(@BeanProperty val intf: Class[_], @BeanProperty val target: Class[_], @BeanProperty val lifeCycle: LifeCycle, @BeanProperty val timeout: Int) extends Server { - def newWorker(proxy: ActiveObjectProxy) = com.scalablesolutions.akka.kernel.Worker(proxy.server, lifeCycle.transform) + def newWorker(proxy: ActiveObjectProxy) = se.scalablesolutions.akka.kernel.Worker(proxy.server, lifeCycle.transform) } diff --git a/kernel/src/main/scala/DataFlowVariable.scala b/kernel/src/main/scala/DataFlowVariable.scala index 76741352d4..dc9c98049d 100644 --- a/kernel/src/main/scala/DataFlowVariable.scala +++ b/kernel/src/main/scala/DataFlowVariable.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009 Scalable Solutions. */ -package com.scalablesolutions.akka.kernel +package se.scalablesolutions.akka.kernel import scala.actors.Actor import scala.actors.OutputChannel diff --git a/kernel/src/main/scala/ErrRef.scala b/kernel/src/main/scala/ErrRef.scala index 38cbc805b8..85e2306681 100644 --- a/kernel/src/main/scala/ErrRef.scala +++ b/kernel/src/main/scala/ErrRef.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009 Scalable Solutions. */ -package com.scalablesolutions.akka.kernel +package se.scalablesolutions.akka.kernel /** * Reference that can hold either a typed value or an exception. @@ -35,16 +35,23 @@ package com.scalablesolutions.akka.kernel * at .() * at Re... * + * + * @author Jonas Bonér */ -class ErrRef[S](s: S){ - private[this] var contents: Either[Throwable, S] = Right(s) - def update(value: => S) = contents = try { Right(value) } catch { case (e : Throwable) => Left(e) } +class ErrRef[Payload](payload: Payload, val tx: Option[Transaction]){ + private[this] var contents: Either[Throwable, Payload] = Right(payload) + + def update(value: => Payload) = { + contents = try { Right(value) } catch { case (e : Throwable) => Left(e) } + } + def apply() = contents match { - case Right(s) => s + case Right(payload) => payload case Left(e) => throw e.fillInStackTrace } + override def toString(): String = "ErrRef[" + contents + "]" } object ErrRef { - def apply[S](s: S) = new ErrRef(s) + def apply[Payload](payload: Payload, tx: Option[Transaction]) = new ErrRef(payload, tx) } diff --git a/kernel/src/main/scala/GenericServer.scala b/kernel/src/main/scala/GenericServer.scala index f8eafec25a..86814e0034 100644 --- a/kernel/src/main/scala/GenericServer.scala +++ b/kernel/src/main/scala/GenericServer.scala @@ -2,12 +2,12 @@ * Copyright (C) 2009 Scalable Solutions. */ -package com.scalablesolutions.akka.kernel +package se.scalablesolutions.akka.kernel import scala.actors._ import scala.actors.Actor._ -import com.scalablesolutions.akka.kernel.Helpers._ +import se.scalablesolutions.akka.kernel.Helpers._ sealed abstract class GenericServerMessage case class Init(config: AnyRef) extends GenericServerMessage @@ -91,7 +91,7 @@ class GenericServerContainer(val id: String, var serverFactory: () => GenericSer var lifeCycle: Option[LifeCycle] = None val lock = new ReadWriteLock - private var server: GenericServer = null + private var server: GenericServer = _ private var currentConfig: Option[AnyRef] = None private var timeout = 5000 @@ -278,5 +278,15 @@ class GenericServerContainer(val id: String, var serverFactory: () => GenericSer } private[kernel] def getServer: GenericServer = server + + private[kernel] def cloneServerAndReturnOldVersion: GenericServer = lock.withWriteLock { + val oldServer = server + server = Serializer.deepClone(server) + oldServer + } + + private[kernel] def swapServer(newServer: GenericServer) = lock.withWriteLock { + server = newServer + } } diff --git a/kernel/src/main/scala/HashCode.scala b/kernel/src/main/scala/HashCode.scala index d0cd5af2d3..012d807a75 100755 --- a/kernel/src/main/scala/HashCode.scala +++ b/kernel/src/main/scala/HashCode.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009 Scalable Solutions. */ -package com.scalablesolutions.akka.kernel +package se.scalablesolutions.akka.kernel import java.lang.reflect.{Array => JArray} import java.lang.{Float => JFloat, Double => JDouble} diff --git a/kernel/src/main/scala/Helpers.scala b/kernel/src/main/scala/Helpers.scala index a97faca625..42a63a19db 100644 --- a/kernel/src/main/scala/Helpers.scala +++ b/kernel/src/main/scala/Helpers.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009 Scalable Solutions. */ -package com.scalablesolutions.akka.kernel +package se.scalablesolutions.akka.kernel import java.util.concurrent.locks.ReentrantReadWriteLock diff --git a/kernel/src/main/scala/Kernel.scala b/kernel/src/main/scala/Kernel.scala index 149c2923c4..32a6a83032 100755 --- a/kernel/src/main/scala/Kernel.scala +++ b/kernel/src/main/scala/Kernel.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009 Scalable Solutions. */ -package com.scalablesolutions.akka.kernel +package se.scalablesolutions.akka.kernel import org.apache.zookeeper.jmx.ManagedUtil import org.apache.zookeeper.server.persistence.FileTxnSnapLog @@ -33,7 +33,7 @@ object Kernel extends Logging { val JERSEY_SERVER_URL = "http://" + SERVER_URL + "/" val JERSEY_SERVER_PORT = 9998 - val JERSEY_REST_CLASSES_ROOT_PACKAGE = "com.scalablesolutions.akka.kernel" + val JERSEY_REST_CLASSES_ROOT_PACKAGE = "se.scalablesolutions.akka.kernel" val JERSEY_BASE_URI = UriBuilder.fromUri(JERSEY_SERVER_URL).port(getPort(JERSEY_SERVER_PORT)).build() val VOLDEMORT_SERVER_URL = "tcp://" + SERVER_URL diff --git a/kernel/src/main/scala/Logging.scala b/kernel/src/main/scala/Logging.scala index a0fac55b08..dd852ff20f 100755 --- a/kernel/src/main/scala/Logging.scala +++ b/kernel/src/main/scala/Logging.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009 Scalable Solutions. */ -package com.scalablesolutions.akka.kernel +package se.scalablesolutions.akka.kernel import net.lag.configgy.Config import net.lag.logging.Logger diff --git a/kernel/src/main/scala/NetCat.scala b/kernel/src/main/scala/NetCat.scala index 25283a9642..ed6ec2b085 100644 --- a/kernel/src/main/scala/NetCat.scala +++ b/kernel/src/main/scala/NetCat.scala @@ -37,8 +37,7 @@ object NetCat { // Set reader idle time to 10 seconds. // sessionIdle(...) method will be invoked when no data is read // for 10 seconds. - val config = immutable.Map.empty[Any, Any] + -Tuple2(IdleTime(IdleStatus.READER_IDLE), 10) + val config = immutable.Map.empty[Any, Any] + Tuple2(IdleTime(IdleStatus.READER_IDLE), 10) session.callReact(SetConfig(config)) { case OK(_) => () case Error(cause) => exit(('setConfigFailed, cause)) @@ -98,8 +97,7 @@ Tuple2(IdleTime(IdleStatus.READER_IDLE), 10) connector.setConnectTimeout(30) // Hook up our code, and start service. - val handlingReference = IoSessionActor.installHandling(connector, -handleSession(_)) + val handlingReference = IoSessionActor.installHandling(connector, handleSession(_)) val cf = connector.connect(new InetSocketAddress(host, port)) cf.awaitUninterruptibly() cf.getSession().getCloseFuture().awaitUninterruptibly() diff --git a/kernel/src/main/scala/Supervisor.scala b/kernel/src/main/scala/Supervisor.scala index 9cd05b5b12..eeaab98f2b 100644 --- a/kernel/src/main/scala/Supervisor.scala +++ b/kernel/src/main/scala/Supervisor.scala @@ -2,13 +2,13 @@ * Copyright (C) 2009 Scalable Solutions. */ -package com.scalablesolutions.akka.kernel +package se.scalablesolutions.akka.kernel import scala.actors._ import scala.actors.Actor._ import scala.collection.mutable.HashMap -import com.scalablesolutions.akka.kernel.Helpers._ +import se.scalablesolutions.akka.kernel.Helpers._ //==================================================== diff --git a/kernel/src/test/scala/activeObjectSpecs.scala b/kernel/src/test/scala/activeObjectSpecs.scala index 42432acb0c..284153f717 100755 --- a/kernel/src/test/scala/activeObjectSpecs.scala +++ b/kernel/src/test/scala/activeObjectSpecs.scala @@ -2,12 +2,12 @@ * Copyright (C) 2009 Scalable Solutions. */ -package com.scalablesolutions.akka.kernel +package se.scalablesolutions.akka.kernel import org.specs.runner.JUnit4 import org.specs.Specification -import com.scalablesolutions.akka.supervisor._ -import com.scalablesolutions.akka.annotation.oneway + +import se.scalablesolutions.akka.annotation.oneway /** * @author Jonas Bonér diff --git a/kernel/src/test/scala/restManagerSpecs.scala b/kernel/src/test/scala/restManagerSpecs.scala index cc02105cca..bdfa2d167a 100755 --- a/kernel/src/test/scala/restManagerSpecs.scala +++ b/kernel/src/test/scala/restManagerSpecs.scala @@ -2,12 +2,18 @@ * Copyright (C) 2009 Scalable Solutions. */ -package com.scalablesolutions.akka.kernel +package se.scalablesolutions.akka.kernel import org.specs.runner.JUnit4 import org.specs.Specification + import javax.ws.rs.{Produces, Path, GET} +//import com.sun.net.httpserver.HttpServer; +//import com.sun.ws.rest.api.client.Client; +//import com.sun.ws.rest.api.client.ClientResponse; +//import com.sun.ws.rest.api.client.ResourceProxy; + /** * @author Jonas Bonér */ @@ -16,7 +22,13 @@ object restManagerSpec extends Specification { "jersey server should be able to start and stop" in { val threadSelector = Kernel.startJersey - threadSelector.stopEndpoint +/* val cc = new DefaultClientConfig + val c = Client.create(cc) + val resource = c.proxy("http://localhost:9998/") + val hello = resource.get(classOf[HelloWorldResource]) + val msg = hello.getMessage + println("=============: " + msg) +*/ threadSelector.stopEndpoint } } diff --git a/util-java/src/main/java/com/scalablesolutions/akka/annotation/oneway.java b/util-java/src/main/java/com/scalablesolutions/akka/annotation/oneway.java deleted file mode 100644 index b60b55ac93..0000000000 --- a/util-java/src/main/java/com/scalablesolutions/akka/annotation/oneway.java +++ /dev/null @@ -1,11 +0,0 @@ -/** - * Copyright (C) 2009 Scalable Solutions. - */ - -package com.scalablesolutions.akka.annotation; - -import java.lang.annotation.*; - -@Retention(RetentionPolicy.RUNTIME) -@Target(ElementType.METHOD) -public @interface oneway {}