diff --git a/akka.iws b/akka.iws index 4ff0203169..03d248f4cb 100644 --- a/akka.iws +++ b/akka.iws @@ -1,23 +1,22 @@ - - - - + - + + + + - - - - + + + + - @@ -38,45 +37,6 @@ - - - - - - + + + + + + + + @@ -148,64 +160,91 @@ - - - - - - - - - - + - + - + - - + + - + - - + + - + - - + + - + - - + + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -213,33 +252,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -300,6 +312,140 @@ - + @@ -1184,31 +1398,31 @@ - + - - + + - + - - + + - - + + - - + + @@ -1254,116 +1468,116 @@ - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + + + + + + + + - + - + - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - + + + + + + + + + + + + + + + + + - + + + + + + + + diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemFailer.java b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemFailer.java new file mode 100644 index 0000000000..ed985ee0df --- /dev/null +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemFailer.java @@ -0,0 +1,5 @@ +package se.scalablesolutions.akka.api; + +public interface InMemFailer { + public void fail(); +} diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemFailerImpl.java b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemFailerImpl.java new file mode 100644 index 0000000000..d8614f3850 --- /dev/null +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemFailerImpl.java @@ -0,0 +1,7 @@ +package se.scalablesolutions.akka.api; + +public class InMemFailerImpl implements InMemFailer { + public void fail() { + throw new RuntimeException("expected"); + } +} diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java new file mode 100644 index 0000000000..4358ce1c05 --- /dev/null +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java @@ -0,0 +1,23 @@ +package se.scalablesolutions.akka.api; + +import se.scalablesolutions.akka.annotation.transactional; + +public interface InMemStateful { + // transactional + @transactional + public void success(String key, String msg); + + @transactional + public void failure(String key, String msg, InMemFailer failer); + + //@transactional + //public void clashOk(String key, String msg, InMemClasher clasher); + + //@transactional + //public void clashNotOk(String key, String msg, InMemClasher clasher); + + // non-transactional + public String getState(String key); + + public void setState(String key, String value); +} diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulImpl.java b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulImpl.java new file mode 100644 index 0000000000..31420012fe --- /dev/null +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulImpl.java @@ -0,0 +1,40 @@ +package se.scalablesolutions.akka.api; + +import se.scalablesolutions.akka.annotation.state; +import se.scalablesolutions.akka.kernel.TransactionalMap; +import se.scalablesolutions.akka.kernel.InMemoryTransactionalMap; + +public class InMemStatefulImpl implements InMemStateful { + @state + private TransactionalMap state = new InMemoryTransactionalMap(); + + public String getState(String key) { + return state.get(key).get(); + } + + public void setState(String key, String msg) { + state.put(key, msg); + } + + public void success(String key, String msg) { + state.put(key, msg); + } + + public void failure(String key, String msg, InMemFailer failer) { + state.put(key, msg); + failer.fail(); + } + + /* + public void clashOk(String key, String msg, InMemClasher clasher) { + state.put(key, msg); + clasher.clash(); + } + + public void clashNotOk(String key, String msg, InMemClasher clasher) { + state.put(key, msg); + clasher.clash(); + this.success("clash", "clash"); + } + */ +} \ No newline at end of file diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java index 488ab3e375..00b5514b1c 100755 --- a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java @@ -20,17 +20,18 @@ public class InMemoryStateTest extends TestCase { protected void setUp() { conf.configureActiveObjects( new RestartStrategy(new AllForOne(), 3, 5000), - new Component[] { + new Component[] { + // FIXME: remove string-name, add ctor to only accept target class new Component("inmem-stateful", InMemStateful.class, InMemStatefulImpl.class, new LifeCycle(new Permanent(), 1000), 10000000), - new Component("inmem-failer", InMemFailer.class, InMemFailerImpl.class, new LifeCycle(new Permanent(), 1000), 1000), - new Component("inmem-clasher", InMemClasher.class, InMemClasherImpl.class, new LifeCycle(new Permanent(), 1000), 100000) + new Component("inmem-failer", InMemFailer.class, InMemFailerImpl.class, new LifeCycle(new Permanent(), 1000), 1000) + //new Component("inmem-clasher", InMemClasher.class, InMemClasherImpl.class, new LifeCycle(new Permanent(), 1000), 100000) }).inject().supervise(); } protected void tearDown() { conf.stop(); } - + public void testShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { InMemStateful stateful = conf.getActiveObject(InMemStateful.class); stateful.setState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state @@ -82,69 +83,7 @@ public class InMemoryStateTest extends TestCase { // } } -interface InMemStateful { - // transactional - @transactional - public void success(String key, String msg); - - @transactional - public void failure(String key, String msg, InMemFailer failer); - - @transactional - public void clashOk(String key, String msg, InMemClasher clasher); - - @transactional - public void clashNotOk(String key, String msg, InMemClasher clasher); - - // non-transactional - public String getState(String key); - - public void setState(String key, String value); -} - -class InMemStatefulImpl implements InMemStateful { - @state - private TransactionalMap state = new InMemoryTransactionalMap(); - - public String getState(String key) { - return state.get(key).get(); - } - - public void setState(String key, String msg) { - state.put(key, msg); - } - - public void success(String key, String msg) { - state.put(key, msg); - } - - public void failure(String key, String msg, InMemFailer failer) { - state.put(key, msg); - failer.fail(); - } - - public void clashOk(String key, String msg, InMemClasher clasher) { - state.put(key, msg); - clasher.clash(); - } - - public void clashNotOk(String key, String msg, InMemClasher clasher) { - state.put(key, msg); - clasher.clash(); - this.success("clash", "clash"); - } -} - -interface InMemFailer { - public void fail(); -} - -class InMemFailerImpl implements InMemFailer { - public void fail() { - throw new RuntimeException("expected"); - } -} - +/* interface InMemClasher { public void clash(); @@ -183,3 +122,4 @@ class InMemClasherImpl implements InMemClasher { // try { Thread.sleep(1000); } catch (InterruptedException e) {} } } +*/ diff --git a/kernel/akka-kernel.iml b/kernel/akka-kernel.iml index 2af3d27ef6..e6c8ae4520 100644 --- a/kernel/akka-kernel.iml +++ b/kernel/akka-kernel.iml @@ -141,7 +141,9 @@ - + + + diff --git a/kernel/pom.xml b/kernel/pom.xml index 7a25634c7d..e9634f2d61 100755 --- a/kernel/pom.xml +++ b/kernel/pom.xml @@ -25,6 +25,11 @@ scala-library ${scala.version} + + org.codehaus.aspectwerkz + aspectwerkz-nodeps-jdk5 + 2.1 + net.lag configgy diff --git a/kernel/src/main/scala/ActiveObject.scala b/kernel/src/main/scala/ActiveObject.scala index dc62d0e98a..a87148be9e 100755 --- a/kernel/src/main/scala/ActiveObject.scala +++ b/kernel/src/main/scala/ActiveObject.scala @@ -9,7 +9,10 @@ import config.ActiveObjectGuiceConfigurator import config.ScalaConfig._ import java.util.{List => JList, ArrayList} -import java.lang.reflect.{Method, Field, InvocationHandler, Proxy, InvocationTargetException} +import java.lang.reflect.{Method, Field} +import org.codehaus.aspectwerkz.intercept.{Advisable, AroundAdvice} +import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint} +import org.codehaus.aspectwerkz.proxy.Proxy import java.lang.annotation.Annotation import org.apache.camel.{Processor, Exchange} @@ -31,7 +34,13 @@ object Annotations { * @author Jonas Bonér */ class ActiveObjectFactory { - def newInstance[T](intf: Class[_], proxy: ActiveObjectProxy): T = ActiveObject.newInstance(intf, proxy) + def newInstance[T](target: Class[T], server: GenericServerContainer): T = { + ActiveObject.newInstance(target, server) + } + + def newInstance[T](intf: Class[T], target: AnyRef, server: GenericServerContainer): T = { + ActiveObject.newInstance(intf, target, server) + } def supervise(restartStrategy: RestartStrategy, components: JList[Worker]): Supervisor = ActiveObject.supervise(restartStrategy, components.toArray.toList.asInstanceOf[List[Worker]]) @@ -49,18 +58,17 @@ object ActiveObject { tl } - def newInstance[T](intf: Class[_], proxy: ActiveObjectProxy): T = { - Proxy.newProxyInstance( - intf.getClassLoader, - Array(intf), - proxy).asInstanceOf[T] + def newInstance[T](target: Class[T], server: GenericServerContainer): T = { + val proxy = Proxy.newInstance(target, false, true) + // FIXME switch to weaving in the aspect at compile time + proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new ActorAroundAdvice(target, proxy, server)) + proxy.asInstanceOf[T] } - def newInstance[T](intf: Class[_], target: AnyRef, timeout: Int): T = { - val proxy = new ActiveObjectProxy(intf, target.getClass, timeout) - proxy.setTargetInstance(target) - supervise(proxy) - newInstance(intf, proxy) + def newInstance[T](intf: Class[T], target: AnyRef, server: GenericServerContainer): T = { + val proxy = Proxy.newInstance(Array(intf), Array(target), false, true) + proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new ActorAroundAdvice(intf, target, server)) + proxy.asInstanceOf[T] } def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor = { @@ -72,13 +80,15 @@ object ActiveObject { supervisor } - private def supervise(proxy: ActiveObjectProxy): Supervisor = + /* + private def supervise(proxy: AnyRef): Supervisor = supervise( RestartStrategy(OneForOne, 5, 1000), Worker( proxy.server, LifeCycle(Permanent, 100)) :: Nil) + */ } /** @@ -86,25 +96,21 @@ object ActiveObject { */ // FIXME: STM that allows concurrent updates, detects collision, rolls back and restarts -class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: Int) extends InvocationHandler { +sealed class ActorAroundAdvice(target: Class[_], + targetInstance: AnyRef, + val server: GenericServerContainer) extends AroundAdvice { + val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance) + server.transactionalRefs = refs + server.transactionalMaps = maps + server.transactionalVectors = vectors + import ActiveObject.threadBoundTx - private[this] var activeTx: Option[Transaction] = None - private[akka] var targetInstance: AnyRef = _ - - private[akka] def setTargetInstance(instance: AnyRef) = { - targetInstance = instance - val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance) - server.transactionalRefs = refs - server.transactionalMaps = maps - server.transactionalVectors = vectors - } - - private[akka] val server = new GenericServerContainer(intf.getName, () => new Dispatcher(target.getName)) - server.setTimeout(timeout) - - def invoke(proxy: AnyRef, m: Method, args: Array[AnyRef]): AnyRef = { - if (m.isAnnotationPresent(Annotations.transactional)) { + + def invoke(joinpoint: JoinPoint): AnyRef = { + // FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint + val method = joinpoint.getRtti.asInstanceOf[MethodRtti].getMethod + if (method.isAnnotationPresent(Annotations.transactional)) { if (activeTx.isDefined) { val tx = activeTx.get //val cflowTx = threadBoundTx.get @@ -128,22 +134,23 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I activeTx = Some(currentTx) } activeTx = threadBoundTx.get - invoke(Invocation(m, args, targetInstance, activeTx)) + invoke(joinpoint, activeTx) + //invoke(Invocation(method, joinpoint.getRtti.asInstanceOf[MethodRtti].getParameterValues, targetInstance, activeTx)) } - private def invoke(invocation: Invocation): AnyRef = { + private def invoke(joinpoint: JoinPoint, tx: Option[Transaction]): AnyRef = { val result: AnyRef = /* - if (invocation.target.isInstanceOf[MessageDriven] && - invocation.method.getName == "onMessage") { - val m = invocation.method + if (joinpoint.target.isInstanceOf[MessageDriven] && + joinpoint.method.getName == "onMessage") { + val m = joinpoint.method val endpointName = m.getDeclaringClass.getName + "." + m.getName val activeObjectName = m.getDeclaringClass.getName val endpoint = conf.getRoutingEndpoint(conf.lookupUriFor(m)) val producer = endpoint.createProducer val exchange = endpoint.createExchange - exchange.getIn().setBody(invocation) + exchange.getIn().setBody(joinpoint) producer.process(exchange) val fault = exchange.getException(); if (fault != null) throw new InvocationTargetException(fault) @@ -151,20 +158,22 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I // FIXME: need some timeout and future here... exchange.getOut.getBody - } else */ - if (invocation.method.isAnnotationPresent(Annotations.oneway)) { - server ! invocation + } else */ + // FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint + if (joinpoint.getRtti.asInstanceOf[MethodRtti].getMethod.isAnnotationPresent(Annotations.oneway)) { + server ! (tx, joinpoint) } else { val result: ErrRef[AnyRef] = - server !!! (invocation, { + server !!! ((tx, joinpoint), { var ref = ErrRef(activeTx) - ref() = throw new ActiveObjectInvocationTimeoutException("Invocation to active object [" + targetInstance.getClass.getName + "] timed out after " + timeout + " milliseconds") + ref() = throw new ActiveObjectInvocationTimeoutException("Invocation to active object [" + targetInstance.getClass.getName + "] timed out after " + server.timeout + " milliseconds") ref }) try { result() } catch { case e => + println("$$$$$$$$$$$$$$ " + joinpoint) rollback(result.tx) throw e } @@ -188,19 +197,19 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I var vectors: List[TransactionalVector[_]] = Nil var refs: List[TransactionalRef[_]] = Nil for { - field <- target.getDeclaredFields.toArray.toList.asInstanceOf[List[Field]] + field <- targetInstance.getClass.getDeclaredFields.toArray.toList.asInstanceOf[List[Field]] fieldType = field.getType if fieldType == classOf[TransactionalMap[_, _]] || - fieldType == classOf[TransactionalVector[_]] || - fieldType == classOf[TransactionalRef[_]] + fieldType == classOf[TransactionalVector[_]] || + fieldType == classOf[TransactionalRef[_]] txItem = { field.setAccessible(true) field.get(targetInstance) } if txItem != null } { - if (txItem.isInstanceOf[TransactionalMap[_, _]]) maps ::= txItem.asInstanceOf[TransactionalMap[_, _]] - else if (txItem.isInstanceOf[TransactionalRef[_]]) refs ::= txItem.asInstanceOf[TransactionalRef[_]] + if (txItem.isInstanceOf[TransactionalMap[_, _]]) maps ::= txItem.asInstanceOf[TransactionalMap[_, _]] + else if (txItem.isInstanceOf[TransactionalRef[_]]) refs ::= txItem.asInstanceOf[TransactionalRef[_]] else if (txItem.isInstanceOf[TransactionalVector[_]]) vectors ::= txItem.asInstanceOf[TransactionalVector[_]] } (maps, vectors, refs) @@ -215,14 +224,11 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I private[kernel] class Dispatcher(val targetName: String) extends GenericServer { override def body: PartialFunction[Any, Unit] = { - case invocation: Invocation => - val tx = invocation.tx + case (tx: Option[Transaction], joinpoint: JoinPoint) => ActiveObject.threadBoundTx.set(tx) try { - reply(ErrRef(invocation.invoke, tx)) + reply(ErrRef(joinpoint.proceed, tx)) } catch { - case e: InvocationTargetException => - val ref = ErrRef(tx); ref() = throw e.getTargetException; reply(ref) case e => val ref = ErrRef(tx); ref() = throw e; reply(ref) } @@ -254,7 +260,10 @@ private[kernel] case class Invocation(val method: Method, method.setAccessible(true) def invoke: AnyRef = synchronized { - method.invoke(target, args:_*) + println("======== " + this.toString) + if (method.getDeclaringClass.isInterface) { + target.getClass.getDeclaredMethod(method.getName, method.getParameterTypes).invoke(target, args:_*) + } else method.invoke(target, args:_*) } override def toString: String = synchronized { diff --git a/kernel/src/main/scala/GenericServer.scala b/kernel/src/main/scala/GenericServer.scala index 8b76df839d..9ecf942619 100644 --- a/kernel/src/main/scala/GenericServer.scala +++ b/kernel/src/main/scala/GenericServer.scala @@ -94,7 +94,7 @@ class GenericServerContainer( private var server: GenericServer = _ private var currentConfig: Option[AnyRef] = None - private var timeout = 5000 + private[kernel] var timeout = 5000 // TODO: see if we can parameterize class and add type safe getActor method //class GenericServerContainer[T <: GenericServer](var factory: () => T) { diff --git a/kernel/src/main/scala/config/ActiveObjectConfigurator.scala b/kernel/src/main/scala/config/ActiveObjectConfigurator.scala index 93b115b758..d48690a33f 100644 --- a/kernel/src/main/scala/config/ActiveObjectConfigurator.scala +++ b/kernel/src/main/scala/config/ActiveObjectConfigurator.scala @@ -31,8 +31,6 @@ trait ActiveObjectConfigurator { */ def getActiveObject[T](clazz: Class[T]): T - def getActiveObjectProxy(clazz: Class[_]): ActiveObjectProxy - def getExternalDependency[T](clazz: Class[T]): T def getComponentInterfaces: List[Class[_]] diff --git a/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala index 1463cb7630..6b0732f015 100644 --- a/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala +++ b/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala @@ -16,9 +16,7 @@ import org.apache.camel.{CamelContext, Endpoint, Routes} import scala.collection.mutable.HashMap import kernel.camel.ActiveObjectComponent -import kernel.ActiveObjectFactory -import kernel.ActiveObjectProxy -import kernel.Supervisor +import kernel.{ActiveObjectFactory, Supervisor} import kernel.config.ScalaConfig._ /** @@ -33,7 +31,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC private var components: List[Component] = _ private var bindings: List[DependencyBinding] = Nil private var configRegistry = new HashMap[Class[_], Component] // TODO is configRegistry needed? - private var activeObjectRegistry = new HashMap[Class[_], Tuple3[Class[_], Class[_], ActiveObjectProxy]] + private var activeObjectRegistry = new HashMap[Class[_], Tuple2[Component, GenericServerContainer]] private var activeObjectFactory = new ActiveObjectFactory private var camelContext = new DefaultCamelContext private var modules = new java.util.ArrayList[Module] @@ -48,19 +46,22 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC override def getActiveObject[T](clazz: Class[T]): T = synchronized { log.debug("Creating new active object [%s]", clazz.getName) if (injector == null) throw new IllegalStateException("inject() and/or supervise() must be called before invoking getActiveObject(clazz)") - val activeObjectOption: Option[Tuple3[Class[_], Class[_], ActiveObjectProxy]] = activeObjectRegistry.get(clazz) + val activeObjectOption: Option[Tuple2[Component, GenericServerContainer]] = activeObjectRegistry.get(clazz) if (activeObjectOption.isDefined) { - val classInfo = activeObjectOption.get - val intfClass = classInfo._1 - val implClass = classInfo._2 - val activeObjectProxy = classInfo._3 - val target = implClass.newInstance - injector.injectMembers(target) - activeObjectProxy.setTargetInstance(target.asInstanceOf[AnyRef]) - activeObjectFactory.newInstance(intfClass, activeObjectProxy).asInstanceOf[T] + val (component, server) = activeObjectOption.get + server.setTimeout(component.timeout) + val proxy = if (component.intf == null) { // subclassing proxy + activeObjectFactory.newInstance(component.target, server).asInstanceOf[T] + } else { // delegating proxy + component.target.getConstructor(Array[Class[_]]()).setAccessible(true) + val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry + activeObjectFactory.newInstance(component.intf, targetInstance, server).asInstanceOf[T] + } + injector.injectMembers(proxy) + proxy } else throw new IllegalStateException("Class [" + clazz.getName + "] has not been put under supervision (by passing in the config to the 'supervise') method") } - + /* override def getActiveObjectProxy(clazz: Class[_]): ActiveObjectProxy = synchronized { log.debug("Looking up active object proxy [%s]", clazz.getName) if (injector == null) throw new IllegalStateException("inject() and/or supervise() must be called before invoking getActiveObjectProxy(clazz)") @@ -68,7 +69,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC if (activeObjectOption.isDefined) activeObjectOption.get._3 else throw new IllegalStateException("Class [" + clazz.getName + "] has not been put under supervision (by passing in the config to the 'supervise') method") } - + */ override def getExternalDependency[T](clazz: Class[T]): T = synchronized { injector.getInstance(clazz).asInstanceOf[T] } @@ -108,19 +109,18 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC if (injector == null) inject var workers = new java.util.ArrayList[Worker] for (component <- components) { - val activeObjectProxy = new ActiveObjectProxy(component.intf, component.target, component.timeout) - workers.add(Worker(activeObjectProxy.server, component.lifeCycle)) - activeObjectRegistry.put(component.intf, (component.intf, component.target, activeObjectProxy)) - camelContext.getRegistry.asInstanceOf[JndiRegistry].bind(component.name, activeObjectProxy) - for (method <- component.intf.getDeclaredMethods.toList) { - registerMethodForUri(method, component.name) - } + val target = if (component.intf != null) component.intf // TODO: use Option + else component.target + val server = new GenericServerContainer(target.getName, () => new Dispatcher(component.target.getName)) + activeObjectRegistry.put(target, (component, server)) + workers.add(Worker(server, component.lifeCycle)) + //camelContext.getRegistry.asInstanceOf[JndiRegistry].bind(component.name, activeObjectProxy) + for (method <- component.intf.getDeclaredMethods.toList) registerMethodForUri(method, component.name) log.debug("Registering active object in Camel context under the name [%s]", component.target.getName) } supervisor = activeObjectFactory.supervise(restartStrategy, workers) - camelContext.addComponent(AKKA_CAMEL_ROUTING_SCHEME, new ActiveObjectComponent(this)) - camelContext.start - + //camelContext.addComponent(AKKA_CAMEL_ROUTING_SCHEME, new ActiveObjectComponent(this)) + //camelContext.start ActiveObjectConfigurator.registerConfigurator(this) this } @@ -154,7 +154,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC def reset = synchronized { modules = new java.util.ArrayList[Module] configRegistry = new HashMap[Class[_], Component] - activeObjectRegistry = new HashMap[Class[_], Tuple3[Class[_], Class[_], ActiveObjectProxy]] + activeObjectRegistry = new HashMap[Class[_], Tuple2[Component, GenericServerContainer]] methodToUriRegistry = new HashMap[Method, String] injector = null restartStrategy = null diff --git a/kernel/src/main/scala/config/ActiveObjectGuiceConfiguratorForJava.scala b/kernel/src/main/scala/config/ActiveObjectGuiceConfiguratorForJava.scala index 2ad74931f2..68da4fe643 100644 --- a/kernel/src/main/scala/config/ActiveObjectGuiceConfiguratorForJava.scala +++ b/kernel/src/main/scala/config/ActiveObjectGuiceConfiguratorForJava.scala @@ -5,7 +5,7 @@ package se.scalablesolutions.akka.kernel.config import akka.kernel.config.JavaConfig._ -import akka.kernel.{Supervisor, ActiveObjectProxy, ActiveObjectFactory} +import akka.kernel.{Supervisor, ActiveObjectFactory} import com.google.inject._ import com.google.inject.jsr250.ResourceProviderFactory diff --git a/kernel/src/main/scala/config/Config.scala b/kernel/src/main/scala/config/Config.scala index 0b5b593f53..4eeb106b3b 100644 --- a/kernel/src/main/scala/config/Config.scala +++ b/kernel/src/main/scala/config/Config.scala @@ -88,8 +88,8 @@ object JavaConfig { @BeanProperty val timeout: Int) extends Server { def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.Component( name, intf, target, lifeCycle.transform, timeout) - def newWorker(proxy: ActiveObjectProxy) = - se.scalablesolutions.akka.kernel.config.ScalaConfig.Worker(proxy.server, lifeCycle.transform) + def newWorker(server: GenericServerContainer) = + se.scalablesolutions.akka.kernel.config.ScalaConfig.Worker(server, lifeCycle.transform) } } \ No newline at end of file diff --git a/kernel/src/main/scala/jersey/ActiveObjectComponentProvider.scala b/kernel/src/main/scala/jersey/ActiveObjectComponentProvider.scala index 6ef9560a55..bae4bd5f2c 100644 --- a/kernel/src/main/scala/jersey/ActiveObjectComponentProvider.scala +++ b/kernel/src/main/scala/jersey/ActiveObjectComponentProvider.scala @@ -4,21 +4,15 @@ package se.scalablesolutions.akka.kernel.jersey -import com.sun.jersey.core.spi.component.ioc.{IoCManagedComponentProvider, IoCFullyManagedComponentProvider, IoCInstantiatedComponentProvider, IoCComponentProvider} +import com.sun.jersey.core.spi.component.ioc.IoCFullyManagedComponentProvider + import kernel.Logging import config.ActiveObjectConfigurator -import com.sun.jersey.core.spi.component.ComponentProvider - import java.lang.reflect.{Constructor, InvocationTargetException} class ActiveObjectComponentProvider(val clazz: Class[_], val configurator: ActiveObjectConfigurator) - extends IoCManagedComponentProvider with Logging { + extends IoCFullyManagedComponentProvider with Logging { - override def getInstance: AnyRef = - configurator.getActiveObject(clazz).asInstanceOf[AnyRef] - - override def getInjectableInstance(obj: AnyRef): AnyRef = { - obj.asInstanceOf[ActiveObjectProxy].targetInstance - } + override def getInstance: AnyRef = configurator.getActiveObject(clazz).asInstanceOf[AnyRef] } \ No newline at end of file diff --git a/lib/aspectwerkz-nodeps-jdk5-2.1.jar b/lib/aspectwerkz-nodeps-jdk5-2.1.jar index 4459392249..c655479cbe 100644 Binary files a/lib/aspectwerkz-nodeps-jdk5-2.1.jar and b/lib/aspectwerkz-nodeps-jdk5-2.1.jar differ