From 8dbcf6d16755cc11ff645298750b750949ae25ee Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Thu, 1 Jul 2010 15:36:27 +0200 Subject: [PATCH 01/19] re #296: Initial support for active object lifecycle management --- .../akka/annotation/shutdown.java | 14 +++++ .../src/main/scala/actor/ActiveObject.scala | 57 ++++++++++++++++--- akka-core/src/main/scala/actor/ActorRef.scala | 8 +-- .../ActiveObjectGuiceConfigurator.scala | 8 ++- .../main/scala/config/SupervisionConfig.scala | 26 ++++++--- 5 files changed, 91 insertions(+), 22 deletions(-) create mode 100644 akka-core/src/main/java/se/scalablesolutions/akka/annotation/shutdown.java diff --git a/akka-core/src/main/java/se/scalablesolutions/akka/annotation/shutdown.java b/akka-core/src/main/java/se/scalablesolutions/akka/annotation/shutdown.java new file mode 100644 index 0000000000..f806e7bca6 --- /dev/null +++ b/akka-core/src/main/java/se/scalablesolutions/akka/annotation/shutdown.java @@ -0,0 +1,14 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.actor.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface shutdown {} \ No newline at end of file diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index 0201391940..5fd5dad1c7 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -25,6 +25,7 @@ object Annotations { val transactionrequired = classOf[transactionrequired] val prerestart = classOf[prerestart] val postrestart = classOf[postrestart] + val shutdown = classOf[shutdown] val inittransactionalstate = classOf[inittransactionalstate] } @@ -358,7 +359,6 @@ object ActiveObject extends Logging { val proxy = Proxy.newInstance(target, true, false) val context = injectActiveObjectContext(proxy) actorRef.actor.asInstanceOf[Dispatcher].initialize(target, proxy, context) - ActorRegistry.unregister(actorRef) // do not store the dispatcher in the ActorRegistry since it will prevent GC actorRef.timeout = timeout if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get) AspectInitRegistry.register(proxy, AspectInit(target, actorRef, remoteAddress, timeout)) @@ -371,7 +371,6 @@ object ActiveObject extends Logging { val context = injectActiveObjectContext(target) val proxy = Proxy.newInstance(Array(intf), Array(target), true, false) actorRef.actor.asInstanceOf[Dispatcher].initialize(target.getClass, target, context) - ActorRegistry.unregister(actorRef) // do not store the dispatcher in the ActorRegistry since it will prevent GC actorRef.timeout = timeout if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get) AspectInitRegistry.register(proxy, AspectInit(intf, actorRef, remoteAddress, timeout)) @@ -379,6 +378,11 @@ object ActiveObject extends Logging { proxy.asInstanceOf[T] } + def stop(obj: AnyRef): Unit = { + val init = AspectInitRegistry.initFor(obj) + init.actorRef.stop + } + /** * Get the underlying dispatcher actor for the given active object. */ @@ -483,9 +487,7 @@ private[akka] object AspectInitRegistry extends ListenerManagement { private val initializations = new java.util.concurrent.ConcurrentHashMap[AnyRef, AspectInit] def initFor(target: AnyRef) = { - val init = initializations.get(target) - initializations.remove(target) - init + initializations.get(target) } def register(target: AnyRef, init: AspectInit) = { @@ -493,10 +495,17 @@ private[akka] object AspectInitRegistry extends ListenerManagement { foreachListener(_ ! AspectInitRegistered(target, init)) res } + + def unregister(target: AnyRef) = { + val res = initializations.remove(target) + foreachListener(_ ! AspectInitUnregistered(target, res)) + res + } } private[akka] sealed trait AspectInitRegistryEvent private[akka] case class AspectInitRegistered(proxy: AnyRef, init: AspectInit) extends AspectInitRegistryEvent +private[akka] case class AspectInitUnregistered(proxy: AnyRef, init: AspectInit) extends AspectInitRegistryEvent private[akka] sealed case class AspectInit( val target: Class[_], @@ -547,7 +556,10 @@ private[akka] sealed class ActiveObjectAspect { val isOneWay = isVoid(rtti) val sender = ActiveObjectContext.sender.value val senderFuture = ActiveObjectContext.senderFuture.value - if (isOneWay) { + + if (!actorRef.isRunning) { + joinPoint.proceed + } else if (isOneWay) { actorRef ! Invocation(joinPoint, true, true, sender, senderFuture) null.asInstanceOf[AnyRef] } else { @@ -656,10 +668,13 @@ object Dispatcher { * * @author Jonas Bonér */ -private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Option[RestartCallbacks]) extends Actor { +private[akka] class Dispatcher(transactionalRequired: Boolean, + var restartCallbacks: Option[RestartCallbacks], + var shutdownCallback: Option[ShutdownCallback] = None) extends Actor { import Dispatcher._ private[actor] var target: Option[AnyRef] = None + private var zhutdown: Option[Method] = None private var preRestart: Option[Method] = None private var postRestart: Option[Method] = None private var initTxState: Option[Method] = None @@ -681,7 +696,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op val methods = targetInstance.getClass.getDeclaredMethods.toList // See if we have any config define restart callbacks - callbacks match { + restartCallbacks match { case None => {} case Some(RestartCallbacks(pre, post)) => preRestart = Some(try { @@ -695,10 +710,22 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op "Could not find post restart method [" + post + "] \nin [" + targetClass.getName + "]. \nIt must have a zero argument definition.") }) } + // See if we have any config define a shutdown callback + shutdownCallback match { + case None => {} + case Some(ShutdownCallback(down)) => + zhutdown = Some(try { + targetInstance.getClass.getDeclaredMethod(down, ZERO_ITEM_CLASS_ARRAY: _*) + } catch { case e => throw new IllegalStateException( + "Could not find shutdown method [" + down + "] \nin [" + + targetClass.getName + "]. \nIt must have a zero argument definition.") }) + } // See if we have any annotation defined restart callbacks if (!preRestart.isDefined) preRestart = methods.find(m => m.isAnnotationPresent(Annotations.prerestart)) if (!postRestart.isDefined) postRestart = methods.find(m => m.isAnnotationPresent(Annotations.postrestart)) + // See if we have an annotation defined shutdown callback + if (!zhutdown.isDefined) zhutdown = methods.find(m => m.isAnnotationPresent(Annotations.shutdown)) if (preRestart.isDefined && preRestart.get.getParameterTypes.length != 0) throw new IllegalStateException( @@ -708,9 +735,14 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op throw new IllegalStateException( "Method annotated with @postrestart or defined as a restart callback in \n[" + targetClass.getName + "] must have a zero argument definition") + if (zhutdown.isDefined && zhutdown.get.getParameterTypes.length != 0) + throw new IllegalStateException( + "Method annotated with @shutdown or defined as a shutdown callback in \n[" + + targetClass.getName + "] must have a zero argument definition") if (preRestart.isDefined) preRestart.get.setAccessible(true) if (postRestart.isDefined) postRestart.get.setAccessible(true) + if (zhutdown.isDefined) zhutdown.get.setAccessible(true) // see if we have a method annotated with @inittransactionalstate, if so invoke it initTxState = methods.find(m => m.isAnnotationPresent(Annotations.inittransactionalstate)) @@ -770,6 +802,15 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op } } + override def shutdown = { + AspectInitRegistry.unregister(target.get); + try { + if (zhutdown.isDefined) { + zhutdown.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) + } + } catch { case e: InvocationTargetException => throw e.getCause } + } + override def initTransactionalState = { try { if (initTxState.isDefined && target.isDefined) initTxState.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index d07e18a314..bc72a2768e 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -831,10 +831,10 @@ sealed class LocalActorRef private[akka]( } val builder = LifeCycleProtocol.newBuilder lifeCycle match { - case Some(LifeCycle(scope, None)) => + case Some(LifeCycle(scope, None, _)) => setScope(builder, scope) Some(builder.build) - case Some(LifeCycle(scope, Some(callbacks))) => + case Some(LifeCycle(scope, Some(callbacks), _)) => setScope(builder, scope) builder.setPreRestart(callbacks.preRestart) builder.setPostRestart(callbacks.postRestart) @@ -1314,7 +1314,7 @@ sealed class LocalActorRef private[akka]( val failedActor = actorInstance.get failedActor.synchronized { lifeCycle.get match { - case LifeCycle(scope, _) => { + case LifeCycle(scope, _, _) => { scope match { case Permanent => Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) @@ -1343,7 +1343,7 @@ sealed class LocalActorRef private[akka]( linkedActorsAsList.foreach { actorRef => if (actorRef.lifeCycle.isEmpty) actorRef.lifeCycle = Some(LifeCycle(Permanent)) actorRef.lifeCycle.get match { - case LifeCycle(scope, _) => { + case LifeCycle(scope, _, _) => { scope match { case Permanent => actorRef.restart(reason) case Temporary => shutDownTemporaryActor(actorRef) diff --git a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala index 8d3a089d26..15e9852181 100644 --- a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala +++ b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala @@ -82,7 +82,9 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat private def newSubclassingProxy(component: Component): DependencyBinding = { val targetClass = component.target - val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)) + val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired, + component.lifeCycle.restartCallbacks, + component.lifeCycle.shutdownCallback)) if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get val remoteAddress = if (component.remoteAddress.isDefined) @@ -99,7 +101,9 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat val targetClass = component.intf.get val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry component.target.getConstructor(Array[Class[_]](): _*).setAccessible(true) - val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)) + val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired, + component.lifeCycle.restartCallbacks, + component.lifeCycle.shutdownCallback)) if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get val remoteAddress = if (component.remoteAddress.isDefined) diff --git a/akka-core/src/main/scala/config/SupervisionConfig.scala b/akka-core/src/main/scala/config/SupervisionConfig.scala index dc04d9f38c..11ce0fe69e 100644 --- a/akka-core/src/main/scala/config/SupervisionConfig.scala +++ b/akka-core/src/main/scala/config/SupervisionConfig.scala @@ -43,13 +43,15 @@ object ScalaConfig { case object AllForOne extends FailOverScheme case object OneForOne extends FailOverScheme - case class LifeCycle(scope: Scope, callbacks: Option[RestartCallbacks]) extends ConfigElement - object LifeCycle { - def apply(scope: Scope) = new LifeCycle(scope, None) - } + case class LifeCycle(scope: Scope, + restartCallbacks: Option[RestartCallbacks] = None, + shutdownCallback: Option[ShutdownCallback] = None) extends ConfigElement case class RestartCallbacks(preRestart: String, postRestart: String) { if ((preRestart eq null) || (postRestart eq null)) throw new IllegalArgumentException("Restart callback methods can't be null") } + case class ShutdownCallback(shutdown: String) { + if (shutdown eq null) throw new IllegalArgumentException("Shutdown callback method can't be null") + } case object Permanent extends Scope case object Temporary extends Scope @@ -136,17 +138,25 @@ object JavaConfig { scheme.transform, maxNrOfRetries, withinTimeRange, trapExceptions.toList) } - class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val callbacks: RestartCallbacks) extends ConfigElement { - def this(scope: Scope) = this(scope, null) + class LifeCycle(@BeanProperty val scope: Scope, + @BeanProperty val restartCallbacks: RestartCallbacks, + @BeanProperty val shutdownCallback: ShutdownCallback) extends ConfigElement { + def this(scope: Scope) = this(scope, null, null) + def this(scope: Scope, restartCallbacks: RestartCallbacks) = this(scope, restartCallbacks, null) + def this(scope: Scope, shutdownCallback: ShutdownCallback) = this(scope, null, shutdownCallback) def transform = { - val callbackOption = if (callbacks eq null) None else Some(callbacks.transform) - se.scalablesolutions.akka.config.ScalaConfig.LifeCycle(scope.transform, callbackOption) + val restartCallbacksOption = if (restartCallbacks eq null) None else Some(restartCallbacks.transform) + val shutdownCallbackOption = if (shutdownCallback eq null) None else Some(shutdownCallback.transform) + se.scalablesolutions.akka.config.ScalaConfig.LifeCycle(scope.transform, restartCallbacksOption, shutdownCallbackOption) } } class RestartCallbacks(@BeanProperty val preRestart: String, @BeanProperty val postRestart: String) { def transform = se.scalablesolutions.akka.config.ScalaConfig.RestartCallbacks(preRestart, postRestart) } + class ShutdownCallback(@BeanProperty val shutdown: String) { + def transform = se.scalablesolutions.akka.config.ScalaConfig.ShutdownCallback(shutdown) + } abstract class Scope extends ConfigElement { def transform: se.scalablesolutions.akka.config.ScalaConfig.Scope From cc8a112dee3badce11b36427840bb28115a04021 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Thu, 1 Jul 2010 15:37:21 +0200 Subject: [PATCH 02/19] Additional remote consumer test --- akka-camel/src/test/scala/RemoteConsumerTest.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/akka-camel/src/test/scala/RemoteConsumerTest.scala b/akka-camel/src/test/scala/RemoteConsumerTest.scala index 149585df4b..7e3b666590 100644 --- a/akka-camel/src/test/scala/RemoteConsumerTest.scala +++ b/akka-camel/src/test/scala/RemoteConsumerTest.scala @@ -55,14 +55,13 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh } } - /* TODO: enable once issues with remote active objects are resolved feature("Client-initiated remote consumer active object") { scenario("access published remote consumer method") { given("a client-initiated remote consumer active object") val consumer = ActiveObject.newRemoteInstance(classOf[PojoRemote], host, port) when("remote consumer publication is triggered") - val latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(1)).get + val latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get consumer.foo("init") assert(latch.await(5000, TimeUnit.MILLISECONDS)) @@ -71,7 +70,6 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh assert(response === "remote active object: test") } } - */ } object RemoteConsumerTest { From 09b78f4695782f7be4d3a289744bbb6546361fd0 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Thu, 1 Jul 2010 16:42:54 +0200 Subject: [PATCH 03/19] re #297: Initial suport for shutting down routes to consumer active objects (both supervised and non-supervised). --- .../src/main/scala/ConsumerPublisher.scala | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/akka-camel/src/main/scala/ConsumerPublisher.scala b/akka-camel/src/main/scala/ConsumerPublisher.scala index 953ac90746..ad69c92d67 100644 --- a/akka-camel/src/main/scala/ConsumerPublisher.scala +++ b/akka-camel/src/main/scala/ConsumerPublisher.scala @@ -47,6 +47,15 @@ private[camel] object ConsumerPublisher extends Logging { CamelContextManager.context.addRoutes(new ConsumerMethodRoute(event.uri, objectId, targetMethod)) log.info("published method %s of %s at endpoint %s" format (targetMethod, event.activeObject, event.uri)) } + + def handleConsumerMethodUnregistered(event: ConsumerMethodUnregistered) { + val targetMethod = event.method.getName + val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod) + + CamelContextManager.activeObjectRegistry.remove(objectId) + CamelContextManager.context.stopRoute(objectId) + log.info("unpublished method %s of %s from endpoint %s" format (targetMethod, event.activeObject, event.uri)) + } } /** @@ -76,8 +85,12 @@ private[camel] class ConsumerPublisher extends Actor { handleConsumerUnregistered(u) latch.countDown // needed for testing only. } - case d: ConsumerMethodRegistered => { - handleConsumerMethodRegistered(d) + case mr: ConsumerMethodRegistered => { + handleConsumerMethodRegistered(mr) + latch.countDown // needed for testing only. + } + case mu: ConsumerMethodUnregistered => { + handleConsumerMethodUnregistered(mu) latch.countDown // needed for testing only. } case SetExpectedMessageCount(num) => { @@ -171,6 +184,8 @@ private[camel] class PublishRequestor extends Actor { for (event <- ConsumerUnregistered.forConsumer(actor)) deliverCurrentEvent(event) case AspectInitRegistered(proxy, init) => for (event <- ConsumerMethodRegistered.forConsumer(proxy, init)) deliverCurrentEvent(event) + case AspectInitUnregistered(proxy, init) => + for (event <- ConsumerMethodUnregistered.forConsumer(proxy, init)) deliverCurrentEvent(event) case PublishRequestorInit(pub) => { publisher = Some(pub) deliverBufferedEvents @@ -244,6 +259,8 @@ private[camel] case class ConsumerUnregistered(actorRef: ActorRef, uri: String, */ private[camel] case class ConsumerMethodRegistered(activeObject: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent +private[camel] case class ConsumerMethodUnregistered(activeObject: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent + /** * @author Martin Krasser */ @@ -291,6 +308,14 @@ private[camel] object ConsumerMethodRegistered { } } +private[camel] object ConsumerMethodUnregistered { + def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodUnregistered] = { + if (init.remoteAddress.isDefined) Nil + else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume]))) + yield ConsumerMethodUnregistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m) + } +} + /** * Describes a consumer actor with elements that are relevant for publishing an actor at a * Camel endpoint (or unpublishing an actor from an endpoint). From f964e64ec27713a5ed649f2c28a522b8afa2c8c5 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Sat, 3 Jul 2010 14:49:43 +0200 Subject: [PATCH 04/19] Track stopping of Dispatcher actor --- akka-core/src/main/scala/actor/ActiveObject.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index 5fd5dad1c7..3579a96041 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -526,6 +526,7 @@ private[akka] sealed case class AspectInit( @Aspect("perInstance") private[akka] sealed class ActiveObjectAspect { @volatile private var isInitialized = false + @volatile private var isStopped = false private var target: Class[_] = _ private var actorRef: ActorRef = _ private var remoteAddress: Option[InetSocketAddress] = _ @@ -557,7 +558,8 @@ private[akka] sealed class ActiveObjectAspect { val sender = ActiveObjectContext.sender.value val senderFuture = ActiveObjectContext.senderFuture.value - if (!actorRef.isRunning) { + if (!actorRef.isRunning && !isStopped) { + isStopped = true joinPoint.proceed } else if (isOneWay) { actorRef ! Invocation(joinPoint, true, true, sender, senderFuture) From 0b6ff861c86af45370fe6faea0b78b2d511f9e35 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Sun, 4 Jul 2010 17:06:40 +0200 Subject: [PATCH 05/19] Tests for ActiveObject lifecycle --- .../scala/ConsumerMethodRegisteredTest.scala | 21 ++- .../akka/actor/SamplePojo.java | 37 +++++ .../akka/actor/SamplePojoAnnotated.java | 52 +++++++ .../scala/ActiveObjectLifecycleSpec.scala | 145 ++++++++++++++++++ 4 files changed, 250 insertions(+), 5 deletions(-) create mode 100644 akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java create mode 100644 akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojoAnnotated.java create mode 100644 akka-core/src/test/scala/ActiveObjectLifecycleSpec.scala diff --git a/akka-camel/src/test/scala/ConsumerMethodRegisteredTest.scala b/akka-camel/src/test/scala/ConsumerMethodRegisteredTest.scala index 493030fa4a..7c28c7d8ee 100644 --- a/akka-camel/src/test/scala/ConsumerMethodRegisteredTest.scala +++ b/akka-camel/src/test/scala/ConsumerMethodRegisteredTest.scala @@ -2,21 +2,19 @@ package se.scalablesolutions.akka.camel import java.net.InetSocketAddress -import org.junit.Test import org.scalatest.junit.JUnitSuite import se.scalablesolutions.akka.actor.{AspectInit, ActiveObject} import se.scalablesolutions.akka.camel.ConsumerMethodRegistered._ +import org.junit.{AfterClass, Test} class ConsumerMethodRegisteredTest extends JUnitSuite { + import ConsumerMethodRegisteredTest._ + val remoteAddress = new InetSocketAddress("localhost", 8888); val remoteAspectInit = AspectInit(classOf[String], null, Some(remoteAddress), 1000) val localAspectInit = AspectInit(classOf[String], null, None, 1000) - val activePojoBase = ActiveObject.newInstance(classOf[PojoBase]) - val activePojoSub = ActiveObject.newInstance(classOf[PojoSub]) - val activePojoIntf = ActiveObject.newInstance(classOf[PojoIntf], new PojoImpl) - val ascendingMethodName = (r1: ConsumerMethodRegistered, r2: ConsumerMethodRegistered) => r1.method.getName < r2.method.getName @@ -44,3 +42,16 @@ class ConsumerMethodRegisteredTest extends JUnitSuite { } } + +object ConsumerMethodRegisteredTest { + val activePojoBase = ActiveObject.newInstance(classOf[PojoBase]) + val activePojoSub = ActiveObject.newInstance(classOf[PojoSub]) + val activePojoIntf = ActiveObject.newInstance(classOf[PojoIntf], new PojoImpl) + + @AfterClass + def afterClass = { + ActiveObject.stop(activePojoBase) + ActiveObject.stop(activePojoSub) + ActiveObject.stop(activePojoIntf) + } +} diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java new file mode 100644 index 0000000000..50f3e43221 --- /dev/null +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java @@ -0,0 +1,37 @@ +package se.scalablesolutions.akka.actor; + +import java.util.concurrent.CountDownLatch; + +public class SamplePojo { + + private CountDownLatch latch; + + public boolean _pre = false; + public boolean _post = false; + public boolean _down = false; + + public CountDownLatch newCountdownLatch(int count) { + latch = new CountDownLatch(count); + return latch; + } + + public String fail() { + throw new RuntimeException("expected"); + } + + public void pre() { + _pre = true; + latch.countDown(); + } + + public void post() { + _post = true; + latch.countDown(); + } + + public void down() { + _down = true; + latch.countDown(); + } + +} diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojoAnnotated.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojoAnnotated.java new file mode 100644 index 0000000000..8bf4ba36d3 --- /dev/null +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojoAnnotated.java @@ -0,0 +1,52 @@ +package se.scalablesolutions.akka.actor; + +import se.scalablesolutions.akka.actor.annotation.postrestart; +import se.scalablesolutions.akka.actor.annotation.prerestart; +import se.scalablesolutions.akka.actor.annotation.shutdown; + +import java.util.concurrent.CountDownLatch; + +public class SamplePojoAnnotated { + + private CountDownLatch latch; + + public boolean _pre = false; + public boolean _post = false; + public boolean _down = false; + + public SamplePojoAnnotated() { + latch = new CountDownLatch(1); + } + + public CountDownLatch newCountdownLatch(int count) { + latch = new CountDownLatch(count); + return latch; + } + + public String greet(String s) { + return "hello " + s; + } + + public String fail() { + throw new RuntimeException("expected"); + } + + @prerestart + public void pre() { + _pre = true; + latch.countDown(); + } + + @postrestart + public void post() { + _post = true; + latch.countDown(); + } + + @shutdown + public void down() { + _down = true; + latch.countDown(); + } + +} \ No newline at end of file diff --git a/akka-core/src/test/scala/ActiveObjectLifecycleSpec.scala b/akka-core/src/test/scala/ActiveObjectLifecycleSpec.scala new file mode 100644 index 0000000000..d6fd5e795e --- /dev/null +++ b/akka-core/src/test/scala/ActiveObjectLifecycleSpec.scala @@ -0,0 +1,145 @@ +package se.scalablesolutions.akka.actor + +import org.junit.runner.RunWith +import org.scalatest.{BeforeAndAfterAll, Spec} +import org.scalatest.junit.JUnitRunner +import org.scalatest.matchers.ShouldMatchers + +import se.scalablesolutions.akka.config.ActiveObjectConfigurator +import se.scalablesolutions.akka.config.JavaConfig._ + +/** + * @author Martin Krasser + */ +@RunWith(classOf[JUnitRunner]) +class ActiveObjectLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAfterAll { + var conf1: ActiveObjectConfigurator = _ + var conf2: ActiveObjectConfigurator = _ + var conf3: ActiveObjectConfigurator = _ + var conf4: ActiveObjectConfigurator = _ + + override protected def beforeAll() = { + val strategy = new RestartStrategy(new AllForOne(), 3, 1000, Array(classOf[Exception])) + val comp1 = new Component(classOf[SamplePojoAnnotated], new LifeCycle(new Permanent()), 1000) + val comp2 = new Component(classOf[SamplePojoAnnotated], new LifeCycle(new Temporary()), 1000) + val comp3 = new Component(classOf[SamplePojo], new LifeCycle(new Permanent(), new RestartCallbacks("pre", "post")), 1000) + val comp4 = new Component(classOf[SamplePojo], new LifeCycle(new Temporary(), new ShutdownCallback("down")), 1000) + conf1 = new ActiveObjectConfigurator().configure(strategy, Array(comp1)).supervise + conf2 = new ActiveObjectConfigurator().configure(strategy, Array(comp2)).supervise + conf3 = new ActiveObjectConfigurator().configure(strategy, Array(comp3)).supervise + conf4 = new ActiveObjectConfigurator().configure(strategy, Array(comp4)).supervise + } + + override protected def afterAll() = { + conf1.stop + conf2.stop + conf3.stop + conf4.stop + } + + it("should restart supervised, annotated active object on failure") { + val obj = conf1.getInstance[SamplePojoAnnotated](classOf[SamplePojoAnnotated]) + val cdl = obj.newCountdownLatch(2) + assert(AspectInitRegistry.initFor(obj) ne null) + try { + obj.fail + fail("expected exception not thrown") + } catch { + case e: RuntimeException => { + cdl.await + assert(obj._pre) + assert(obj._post) + assert(!obj._down) + assert(AspectInitRegistry.initFor(obj) ne null) + } + } + } + + it("should shutdown supervised, annotated active object on failure") { + val obj = conf2.getInstance[SamplePojoAnnotated](classOf[SamplePojoAnnotated]) + val cdl = obj.newCountdownLatch(1) + assert(AspectInitRegistry.initFor(obj) ne null) + try { + obj.fail + fail("expected exception not thrown") + } catch { + case e: RuntimeException => { + cdl.await + assert(!obj._pre) + assert(!obj._post) + assert(obj._down) + assert(AspectInitRegistry.initFor(obj) eq null) + } + } + } + + it("should restart supervised, non-annotated active object on failure") { + val obj = conf3.getInstance[SamplePojo](classOf[SamplePojo]) + val cdl = obj.newCountdownLatch(2) + assert(AspectInitRegistry.initFor(obj) ne null) + try { + obj.fail + fail("expected exception not thrown") + } catch { + case e: RuntimeException => { + cdl.await + assert(obj._pre) + assert(obj._post) + assert(!obj._down) + assert(AspectInitRegistry.initFor(obj) ne null) + } + } + } + + it("should shutdown supervised, non-annotated active object on failure") { + val obj = conf4.getInstance[SamplePojo](classOf[SamplePojo]) + val cdl = obj.newCountdownLatch(1) + assert(AspectInitRegistry.initFor(obj) ne null) + try { + obj.fail + fail("expected exception not thrown") + } catch { + case e: RuntimeException => { + cdl.await + assert(!obj._pre) + assert(!obj._post) + assert(obj._down) + assert(AspectInitRegistry.initFor(obj) eq null) + } + } + } + + it("should shutdown non-supervised, annotated active object on ActiveObject.stop") { + val obj = ActiveObject.newInstance(classOf[SamplePojoAnnotated]) + assert(AspectInitRegistry.initFor(obj) ne null) + assert("hello akka" === obj.greet("akka")) + ActiveObject.stop(obj) + assert(AspectInitRegistry.initFor(obj) eq null) + assert(!obj._pre) + assert(!obj._post) + assert(obj._down) + try { + obj.greet("akka") + fail("access to stopped active object") + } catch { + case e: Exception => { /* test passed */ } + } + } + + it("should shutdown non-supervised, annotated active object on ActorRegistry.shutdownAll") { + val obj = ActiveObject.newInstance(classOf[SamplePojoAnnotated]) + assert(AspectInitRegistry.initFor(obj) ne null) + assert("hello akka" === obj.greet("akka")) + ActorRegistry.shutdownAll + assert(AspectInitRegistry.initFor(obj) eq null) + assert(!obj._pre) + assert(!obj._post) + assert(obj._down) + try { + obj.greet("akka") + fail("access to stopped active object") + } catch { + case e: Exception => { /* test passed */ } + } + } +} \ No newline at end of file From 94a0bed6f0d3ab95ea6e1edcb8db4523eb385129 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Sun, 4 Jul 2010 17:19:46 +0200 Subject: [PATCH 06/19] Added comments. --- .../src/main/scala/ConsumerPublisher.scala | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/akka-camel/src/main/scala/ConsumerPublisher.scala b/akka-camel/src/main/scala/ConsumerPublisher.scala index ad69c92d67..8e0062f065 100644 --- a/akka-camel/src/main/scala/ConsumerPublisher.scala +++ b/akka-camel/src/main/scala/ConsumerPublisher.scala @@ -48,6 +48,9 @@ private[camel] object ConsumerPublisher extends Logging { log.info("published method %s of %s at endpoint %s" format (targetMethod, event.activeObject, event.uri)) } + /** + * Stops route to the already un-registered consumer actor method. + */ def handleConsumerMethodUnregistered(event: ConsumerMethodUnregistered) { val targetMethod = event.method.getName val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod) @@ -107,7 +110,6 @@ private[camel] class ConsumerPublisher extends Actor { */ private[camel] case class SetExpectedMessageCount(num: Int) - /** * Defines an abstract route to a target which is either an actor or an active object method.. * @@ -259,6 +261,18 @@ private[camel] case class ConsumerUnregistered(actorRef: ActorRef, uri: String, */ private[camel] case class ConsumerMethodRegistered(activeObject: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent +/** + * Event indicating that an active object has been stopped. For each + * @consume annotated POJO method a separate instance of this class is + * created. + * + * @param activeObject active object (proxy). + * @param init + * @param uri endpoint URI of the active object method + * @param method method to be un-published. + * + * @author Martin Krasser + */ private[camel] case class ConsumerMethodUnregistered(activeObject: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent /** @@ -309,6 +323,11 @@ private[camel] object ConsumerMethodRegistered { } private[camel] object ConsumerMethodUnregistered { + /** + * Creates a list of ConsumerMethodUnregistered event messages for an active object or an empty + * list if the active object is a proxy for an remote active object or the active object doesn't + * have any @consume annotated methods. + */ def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodUnregistered] = { if (init.remoteAddress.isDefined) Nil else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume]))) From 81745f1122370409475327660327497283eea6a6 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Sun, 4 Jul 2010 19:51:55 +0200 Subject: [PATCH 07/19] Added test subject description --- .../scala/ActiveObjectLifecycleSpec.scala | 180 +++++++++--------- 1 file changed, 91 insertions(+), 89 deletions(-) diff --git a/akka-core/src/test/scala/ActiveObjectLifecycleSpec.scala b/akka-core/src/test/scala/ActiveObjectLifecycleSpec.scala index d6fd5e795e..406d463324 100644 --- a/akka-core/src/test/scala/ActiveObjectLifecycleSpec.scala +++ b/akka-core/src/test/scala/ActiveObjectLifecycleSpec.scala @@ -37,109 +37,111 @@ class ActiveObjectLifecycleSpec extends Spec with ShouldMatchers with BeforeAndA conf4.stop } - it("should restart supervised, annotated active object on failure") { - val obj = conf1.getInstance[SamplePojoAnnotated](classOf[SamplePojoAnnotated]) - val cdl = obj.newCountdownLatch(2) - assert(AspectInitRegistry.initFor(obj) ne null) - try { - obj.fail - fail("expected exception not thrown") - } catch { - case e: RuntimeException => { - cdl.await - assert(obj._pre) - assert(obj._post) - assert(!obj._down) - assert(AspectInitRegistry.initFor(obj) ne null) + describe("ActiveObject lifecycle management") { + it("should restart supervised, annotated active object on failure") { + val obj = conf1.getInstance[SamplePojoAnnotated](classOf[SamplePojoAnnotated]) + val cdl = obj.newCountdownLatch(2) + assert(AspectInitRegistry.initFor(obj) ne null) + try { + obj.fail + fail("expected exception not thrown") + } catch { + case e: RuntimeException => { + cdl.await + assert(obj._pre) + assert(obj._post) + assert(!obj._down) + assert(AspectInitRegistry.initFor(obj) ne null) + } } } - } - it("should shutdown supervised, annotated active object on failure") { - val obj = conf2.getInstance[SamplePojoAnnotated](classOf[SamplePojoAnnotated]) - val cdl = obj.newCountdownLatch(1) - assert(AspectInitRegistry.initFor(obj) ne null) - try { - obj.fail - fail("expected exception not thrown") - } catch { - case e: RuntimeException => { - cdl.await - assert(!obj._pre) - assert(!obj._post) - assert(obj._down) - assert(AspectInitRegistry.initFor(obj) eq null) + it("should shutdown supervised, annotated active object on failure") { + val obj = conf2.getInstance[SamplePojoAnnotated](classOf[SamplePojoAnnotated]) + val cdl = obj.newCountdownLatch(1) + assert(AspectInitRegistry.initFor(obj) ne null) + try { + obj.fail + fail("expected exception not thrown") + } catch { + case e: RuntimeException => { + cdl.await + assert(!obj._pre) + assert(!obj._post) + assert(obj._down) + assert(AspectInitRegistry.initFor(obj) eq null) + } } } - } - it("should restart supervised, non-annotated active object on failure") { - val obj = conf3.getInstance[SamplePojo](classOf[SamplePojo]) - val cdl = obj.newCountdownLatch(2) - assert(AspectInitRegistry.initFor(obj) ne null) - try { - obj.fail - fail("expected exception not thrown") - } catch { - case e: RuntimeException => { - cdl.await - assert(obj._pre) - assert(obj._post) - assert(!obj._down) - assert(AspectInitRegistry.initFor(obj) ne null) + it("should restart supervised, non-annotated active object on failure") { + val obj = conf3.getInstance[SamplePojo](classOf[SamplePojo]) + val cdl = obj.newCountdownLatch(2) + assert(AspectInitRegistry.initFor(obj) ne null) + try { + obj.fail + fail("expected exception not thrown") + } catch { + case e: RuntimeException => { + cdl.await + assert(obj._pre) + assert(obj._post) + assert(!obj._down) + assert(AspectInitRegistry.initFor(obj) ne null) + } } } - } - it("should shutdown supervised, non-annotated active object on failure") { - val obj = conf4.getInstance[SamplePojo](classOf[SamplePojo]) - val cdl = obj.newCountdownLatch(1) - assert(AspectInitRegistry.initFor(obj) ne null) - try { - obj.fail - fail("expected exception not thrown") - } catch { - case e: RuntimeException => { - cdl.await - assert(!obj._pre) - assert(!obj._post) - assert(obj._down) - assert(AspectInitRegistry.initFor(obj) eq null) + it("should shutdown supervised, non-annotated active object on failure") { + val obj = conf4.getInstance[SamplePojo](classOf[SamplePojo]) + val cdl = obj.newCountdownLatch(1) + assert(AspectInitRegistry.initFor(obj) ne null) + try { + obj.fail + fail("expected exception not thrown") + } catch { + case e: RuntimeException => { + cdl.await + assert(!obj._pre) + assert(!obj._post) + assert(obj._down) + assert(AspectInitRegistry.initFor(obj) eq null) + } } } - } - it("should shutdown non-supervised, annotated active object on ActiveObject.stop") { - val obj = ActiveObject.newInstance(classOf[SamplePojoAnnotated]) - assert(AspectInitRegistry.initFor(obj) ne null) - assert("hello akka" === obj.greet("akka")) - ActiveObject.stop(obj) - assert(AspectInitRegistry.initFor(obj) eq null) - assert(!obj._pre) - assert(!obj._post) - assert(obj._down) - try { - obj.greet("akka") - fail("access to stopped active object") - } catch { - case e: Exception => { /* test passed */ } + it("should shutdown non-supervised, annotated active object on ActiveObject.stop") { + val obj = ActiveObject.newInstance(classOf[SamplePojoAnnotated]) + assert(AspectInitRegistry.initFor(obj) ne null) + assert("hello akka" === obj.greet("akka")) + ActiveObject.stop(obj) + assert(AspectInitRegistry.initFor(obj) eq null) + assert(!obj._pre) + assert(!obj._post) + assert(obj._down) + try { + obj.greet("akka") + fail("access to stopped active object") + } catch { + case e: Exception => { /* test passed */ } + } } - } - it("should shutdown non-supervised, annotated active object on ActorRegistry.shutdownAll") { - val obj = ActiveObject.newInstance(classOf[SamplePojoAnnotated]) - assert(AspectInitRegistry.initFor(obj) ne null) - assert("hello akka" === obj.greet("akka")) - ActorRegistry.shutdownAll - assert(AspectInitRegistry.initFor(obj) eq null) - assert(!obj._pre) - assert(!obj._post) - assert(obj._down) - try { - obj.greet("akka") - fail("access to stopped active object") - } catch { - case e: Exception => { /* test passed */ } + it("should shutdown non-supervised, annotated active object on ActorRegistry.shutdownAll") { + val obj = ActiveObject.newInstance(classOf[SamplePojoAnnotated]) + assert(AspectInitRegistry.initFor(obj) ne null) + assert("hello akka" === obj.greet("akka")) + ActorRegistry.shutdownAll + assert(AspectInitRegistry.initFor(obj) eq null) + assert(!obj._pre) + assert(!obj._post) + assert(obj._down) + try { + obj.greet("akka") + fail("access to stopped active object") + } catch { + case e: Exception => { /* test passed */ } + } } } } \ No newline at end of file From cd457ee00b5e2ee681a50ed3fe3ce2213350254b Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Mon, 5 Jul 2010 12:48:26 +0200 Subject: [PATCH 08/19] Tests for stopping active object endpoints; minor refactoring in ConsumerPublisher --- .../src/main/scala/ConsumerPublisher.scala | 35 ++++++++++++----- .../test/scala/CamelServiceFeatureTest.scala | 38 +++++++++++++++++-- .../src/test/scala/PublishRequestorTest.scala | 13 +++++++ 3 files changed, 73 insertions(+), 13 deletions(-) diff --git a/akka-camel/src/main/scala/ConsumerPublisher.scala b/akka-camel/src/main/scala/ConsumerPublisher.scala index 8e0062f065..8d29739f02 100644 --- a/akka-camel/src/main/scala/ConsumerPublisher.scala +++ b/akka-camel/src/main/scala/ConsumerPublisher.scala @@ -303,6 +303,26 @@ private[camel] object ConsumerUnregistered { } } +/** + * @author Martin Krasser + */ +private[camel] object ConsumerMethod { + /** + * Applies a function f to each consumer method of activeObject and + * returns the function results as a list. A consumer method is one that is annotated with + * @consume. If activeObject is a proxy for a remote active object + * f is never called and Nil is returned. + */ + def forConsumer[T](activeObject: AnyRef, init: AspectInit)(f: Method => T): List[T] = { + // TODO: support consumer annotation inheritance + // - visit overridden methods in superclasses + // - visit implemented method declarations in interfaces + if (init.remoteAddress.isDefined) Nil // let remote node publish active object methods on endpoints + else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume]))) + yield f(m) + } +} + /** * @author Martin Krasser */ @@ -313,12 +333,9 @@ private[camel] object ConsumerMethodRegistered { * have any @consume annotated methods. */ def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodRegistered] = { - // TODO: support consumer annotation inheritance - // - visit overridden methods in superclasses - // - visit implemented method declarations in interfaces - if (init.remoteAddress.isDefined) Nil // let remote node publish active object methods on endpoints - else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume]))) - yield ConsumerMethodRegistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m) + ConsumerMethod.forConsumer[ConsumerMethodRegistered](activeObject, init) { + m => ConsumerMethodRegistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m) + } } } @@ -329,9 +346,9 @@ private[camel] object ConsumerMethodUnregistered { * have any @consume annotated methods. */ def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodUnregistered] = { - if (init.remoteAddress.isDefined) Nil - else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume]))) - yield ConsumerMethodUnregistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m) + ConsumerMethod.forConsumer[ConsumerMethodUnregistered](activeObject, init) { + m => ConsumerMethodUnregistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m) + } } } diff --git a/akka-camel/src/test/scala/CamelServiceFeatureTest.scala b/akka-camel/src/test/scala/CamelServiceFeatureTest.scala index 771ed83af3..1e88b62bf2 100644 --- a/akka-camel/src/test/scala/CamelServiceFeatureTest.scala +++ b/akka-camel/src/test/scala/CamelServiceFeatureTest.scala @@ -59,7 +59,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi feature("Unpublish registered consumer actor from the global CamelContext") { - scenario("attempt access to unregistered consumer actor via Camel direct-endpoint") { + scenario("access to unregistered consumer actor via Camel direct-endpoint fails") { val endpointUri = "direct:unpublish-test-1" given("a consumer actor that has been stopped") @@ -78,7 +78,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi when("a request is sent to this actor") val response1 = CamelContextManager.template.requestBody(endpointUri, "msg1") - then("the direct endpoint falls back to its default behaviour and returns the original message") + then("the direct-endpoint falls back to its default behaviour and returns the original message") assert(response1 === "msg1") } } @@ -103,8 +103,8 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi scenario("access active object methods via Camel direct-endpoints") { given("an active object registered after CamelService startup") - val latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get - ActiveObject.newInstance(classOf[PojoBase]) + var latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get + val obj = ActiveObject.newInstance(classOf[PojoBase]) assert(latch.await(5000, TimeUnit.MILLISECONDS)) when("requests are sent to published methods") @@ -116,6 +116,36 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi assert(response1 === "m2base: x y") assert(response2 === "m3base: x y") assert(response3 === "m4base: x y") + + // cleanup to avoid conflicts with next test (i.e. avoid multiple consumers on direct-endpoints) + latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get + ActiveObject.stop(obj) + assert(latch.await(5000, TimeUnit.MILLISECONDS)) + } + } + + feature("Unpublish active object method from the global CamelContext") { + + scenario("access to unregistered active object methof via Camel direct-endpoint fails") { + + given("an active object that has been stopped") + var latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get + val obj = ActiveObject.newInstance(classOf[PojoBase]) + assert(latch.await(5000, TimeUnit.MILLISECONDS)) + + latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get + ActiveObject.stop(obj) + assert(latch.await(5000, TimeUnit.MILLISECONDS)) + + when("requests are sent to published methods") + val response1 = CamelContextManager.template.requestBodyAndHeader("direct:m2base", "x", "test", "y") + val response2 = CamelContextManager.template.requestBodyAndHeader("direct:m3base", "x", "test", "y") + val response3 = CamelContextManager.template.requestBodyAndHeader("direct:m4base", "x", "test", "y") + + then("the direct-endpoints fall back to their default behaviour and return the original message") + assert(response1 === "x") + assert(response2 === "x") + assert(response3 === "x") } } } diff --git a/akka-camel/src/test/scala/PublishRequestorTest.scala b/akka-camel/src/test/scala/PublishRequestorTest.scala index 7729e6eec6..44c6c30684 100644 --- a/akka-camel/src/test/scala/PublishRequestorTest.scala +++ b/akka-camel/src/test/scala/PublishRequestorTest.scala @@ -44,6 +44,19 @@ class PublishRequestorTest extends JUnitSuite { assert(event.method.getName === "foo") } + @Test def shouldReceiveConsumerMethodUnregisteredEvent = { + val obj = ActiveObject.newInstance(classOf[PojoSingle]) + val init = AspectInit(classOf[PojoSingle], null, None, 1000) + val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get + requestor ! AspectInitUnregistered(obj, init) + assert(latch.await(5000, TimeUnit.MILLISECONDS)) + val event = (publisher !! GetRetainedMessage).get.asInstanceOf[ConsumerMethodUnregistered] + assert(event.init === init) + assert(event.uri === "direct:foo") + assert(event.activeObject === obj) + assert(event.method.getName === "foo") + } + @Test def shouldReceiveConsumerRegisteredEvent = { val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get requestor ! ActorRegistered(consumer) From 72e5181814b512b934225f44bb02d4c18095d414 Mon Sep 17 00:00:00 2001 From: momania Date: Mon, 5 Jul 2010 13:45:03 +0200 Subject: [PATCH 09/19] - moved receive timeout logic to ActorRef - receivetimeout now only inititiated when receiveTimeout property is set --- akka-core/src/main/scala/actor/Actor.scala | 23 +-------- akka-core/src/main/scala/actor/ActorRef.scala | 48 ++++++++++++++----- .../src/test/scala/ReceiveTimeoutSpec.scala | 20 ++++++-- 3 files changed, 52 insertions(+), 39 deletions(-) diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 33bf9bf998..b38401a4a6 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -62,7 +62,6 @@ class ActorInitializationException private[akka](message: String) extends Runtim */ object Actor extends Logging { val TIMEOUT = config.getInt("akka.actor.timeout", 5000) - val RECEIVE_TIMEOUT = config.getInt("akka.actor.receive.timeout", 30000) val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) /** @@ -435,7 +434,6 @@ trait Actor extends Logging { // ========================================= private[akka] def base: Receive = try { - cancelReceiveTimeout lifeCycles orElse (self.hotswap getOrElse receive) } catch { case e: NullPointerException => throw new IllegalActorStateException( @@ -443,7 +441,7 @@ trait Actor extends Logging { } private val lifeCycles: Receive = { - case HotSwap(code) => self.hotswap = code; checkReceiveTimeout + case HotSwap(code) => self.hotswap = code; self.checkReceiveTimeout // FIXME : how to reschedule receivetimeout on hotswap? case Restart(reason) => self.restart(reason) case Exit(dead, reason) => self.handleTrapExit(dead, reason) case Link(child) => self.link(child) @@ -451,25 +449,6 @@ trait Actor extends Logging { case UnlinkAndStop(child) => self.unlink(child); child.stop case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message") } - - @volatile protected[akka] var timeoutActor: Option[ActorRef] = None - - private[akka] def cancelReceiveTimeout = { - timeoutActor.foreach { - x => - Scheduler.unschedule(x) - timeoutActor = None - log.debug("Timeout canceled") - } - } - - private[akka] def checkReceiveTimeout = { - //if ((self.hotswap getOrElse receive).isDefinedAt(ReceiveTimeout)) { // FIXME use when 'self' is safe to use, throws NPE sometimes - if ((receive ne null) && receive.isDefinedAt(ReceiveTimeout)) { - log.debug("Scheduling timeout for Actor [" + toString + "]") - timeoutActor = Some(Scheduler.scheduleOnce(self, ReceiveTimeout, self.receiveTimeout, TimeUnit.MILLISECONDS)) - } - } } private[actor] class AnyOptionAsTypedOption(anyOption: Option[Any]) { diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index e10a55e3b1..8fbec1ed85 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -24,12 +24,12 @@ import jsr166x.{Deque, ConcurrentLinkedDeque} import java.net.InetSocketAddress import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.ConcurrentHashMap import java.util.{Map => JMap} import java.lang.reflect.Field import RemoteActorSerialization._ import com.google.protobuf.ByteString +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} /** * ActorRef is an immutable and serializable handle to an Actor. @@ -72,6 +72,8 @@ trait ActorRef extends TransactionManagement { @volatile protected[akka] var _isBeingRestarted = false @volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServer.HOSTNAME, RemoteServer.PORT) + @volatile protected[akka] var _timeoutActor: Option[ActorRef] = None + @volatile protected[akka] var startOnCreation = false @volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false protected[this] val guard = new ReentrantGuard @@ -100,9 +102,9 @@ trait ActorRef extends TransactionManagement { * User overridable callback/setting. *

* Defines the default timeout for an initial receive invocation. - * Used if the receive (or HotSwap) contains a case handling ReceiveTimeout. + * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. */ - @volatile var receiveTimeout: Long = Actor.RECEIVE_TIMEOUT + @volatile var receiveTimeout: Option[Long] = None /** * User overridable callback/setting. @@ -386,7 +388,7 @@ trait ActorRef extends TransactionManagement { * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists. * However, it will always participate in an existing transaction. */ - def makeTransactionRequired(): Unit + def makeTransactionRequired: Unit /** * Sets the transaction configuration for this actor. Needs to be invoked before the actor is started. @@ -429,12 +431,12 @@ trait ActorRef extends TransactionManagement { * Shuts down the actor its dispatcher and message queue. * Alias for 'stop'. */ - def exit() = stop() + def exit = stop /** * Shuts down the actor its dispatcher and message queue. */ - def stop(): Unit + def stop: Unit /** * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will @@ -510,7 +512,7 @@ trait ActorRef extends TransactionManagement { /** * Shuts down and removes all linked actors. */ - def shutdownLinkedActors(): Unit + def shutdownLinkedActors: Unit protected[akka] def invoke(messageHandle: MessageInvocation): Unit @@ -551,6 +553,24 @@ trait ActorRef extends TransactionManagement { } override def toString = "Actor[" + id + ":" + uuid + "]" + + protected[akka] def cancelReceiveTimeout = { + _timeoutActor.foreach { + x => + Scheduler.unschedule(x) + _timeoutActor = None + log.debug("Timeout canceled") + } + } + + protected [akka] def checkReceiveTimeout = { + cancelReceiveTimeout + receiveTimeout.foreach { timeout => + log.debug("Scheduling timeout for Actor [" + toString + "]") + _timeoutActor = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, timeout, TimeUnit.MILLISECONDS)) + } + } + } /** @@ -680,7 +700,7 @@ sealed class LocalActorRef private[akka]( * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists. * However, it will always participate in an existing transaction. */ - def makeTransactionRequired() = guard.withGuard { + def makeTransactionRequired = guard.withGuard { if (!isRunning || isBeingRestarted) isTransactor = true else throw new ActorInitializationException( "Can not make actor transaction required after it has been started") @@ -736,6 +756,7 @@ sealed class LocalActorRef private[akka]( */ def stop = guard.withGuard { if (isRunning) { + cancelReceiveTimeout dispatcher.unregister(this) _transactionFactory = None _isRunning = false @@ -873,7 +894,7 @@ sealed class LocalActorRef private[akka]( /** * Shuts down and removes all linked actors. */ - def shutdownLinkedActors(): Unit = guard.withGuard { + def shutdownLinkedActors: Unit = guard.withGuard { linkedActorsAsList.foreach(_.stop) linkedActors.clear } @@ -1000,6 +1021,7 @@ sealed class LocalActorRef private[akka]( setTransactionSet(txSet) try { + cancelReceiveTimeout // FIXME: leave this here? if (isTransactor) { val txFactory = _transactionFactory.getOrElse(DefaultGlobalTransactionFactory) atomic(txFactory) { @@ -1158,7 +1180,7 @@ sealed class LocalActorRef private[akka]( ActorRegistry.register(this) if (id == "N/A") id = actorClass.getName // if no name set, then use default name (class name) clearTransactionSet // clear transaction set that might have been created if atomic block has been used within the Actor constructor body - actor.checkReceiveTimeout + checkReceiveTimeout } private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) { @@ -1221,7 +1243,7 @@ private[akka] case class RemoteActorRef private[akka] ( this } - def stop(): Unit = { + def stop: Unit = { _isRunning = false _isShutDown = true } @@ -1237,7 +1259,7 @@ private[akka] case class RemoteActorRef private[akka] ( def actorClass: Class[_ <: Actor] = unsupported def dispatcher_=(md: MessageDispatcher): Unit = unsupported def dispatcher: MessageDispatcher = unsupported - def makeTransactionRequired(): Unit = unsupported + def makeTransactionRequired: Unit = unsupported def transactionConfig_=(config: TransactionConfig): Unit = unsupported def transactionConfig: TransactionConfig = unsupported def makeRemote(hostname: String, port: Int): Unit = unsupported @@ -1254,7 +1276,7 @@ private[akka] case class RemoteActorRef private[akka] ( def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = unsupported def mailboxSize: Int = unsupported def supervisor: Option[ActorRef] = unsupported - def shutdownLinkedActors(): Unit = unsupported + def shutdownLinkedActors: Unit = unsupported protected[akka] def mailbox: Deque[MessageInvocation] = unsupported protected[akka] def restart(reason: Throwable): Unit = unsupported protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported diff --git a/akka-core/src/test/scala/ReceiveTimeoutSpec.scala b/akka-core/src/test/scala/ReceiveTimeoutSpec.scala index 938cb08d43..5c50337894 100644 --- a/akka-core/src/test/scala/ReceiveTimeoutSpec.scala +++ b/akka-core/src/test/scala/ReceiveTimeoutSpec.scala @@ -14,7 +14,7 @@ class ReceiveTimeoutSpec extends JUnitSuite { val timeoutLatch = new StandardLatch val timeoutActor = actorOf(new Actor { - self.receiveTimeout = 500 + self.receiveTimeout = Some(500L) protected def receive = { case ReceiveTimeout => timeoutLatch.open @@ -28,7 +28,7 @@ class ReceiveTimeoutSpec extends JUnitSuite { val timeoutLatch = new StandardLatch val timeoutActor = actorOf(new Actor { - self.receiveTimeout = 500 + self.receiveTimeout = Some(500L) protected def receive = { case ReceiveTimeout => timeoutLatch.open @@ -51,7 +51,7 @@ class ReceiveTimeoutSpec extends JUnitSuite { val timeoutLatch = new StandardLatch case object Tick val timeoutActor = actorOf(new Actor { - self.receiveTimeout = 500 + self.receiveTimeout = Some(500L) protected def receive = { case Tick => () @@ -60,6 +60,18 @@ class ReceiveTimeoutSpec extends JUnitSuite { }).start timeoutActor ! Tick - assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS) == false) + assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == false) + } + + @Test def timeoutShouldNotBeSentWhenNotSpecified = { + val timeoutLatch = new StandardLatch + val timeoutActor = actorOf(new Actor { + + protected def receive = { + case ReceiveTimeout => timeoutLatch.open + } + }).start + + assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS) == false) } } From e877064e17e2fa678ae4a9ec4715b06731e15d83 Mon Sep 17 00:00:00 2001 From: momania Date: Mon, 5 Jul 2010 13:57:53 +0200 Subject: [PATCH 10/19] set emtpy parens back --- akka-core/src/main/scala/actor/ActorRef.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 8fbec1ed85..447931019a 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -388,7 +388,7 @@ trait ActorRef extends TransactionManagement { * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists. * However, it will always participate in an existing transaction. */ - def makeTransactionRequired: Unit + def makeTransactionRequired(): Unit /** * Sets the transaction configuration for this actor. Needs to be invoked before the actor is started. @@ -431,12 +431,12 @@ trait ActorRef extends TransactionManagement { * Shuts down the actor its dispatcher and message queue. * Alias for 'stop'. */ - def exit = stop + def exit() = stop() /** * Shuts down the actor its dispatcher and message queue. */ - def stop: Unit + def stop(): Unit /** * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will @@ -512,7 +512,7 @@ trait ActorRef extends TransactionManagement { /** * Shuts down and removes all linked actors. */ - def shutdownLinkedActors: Unit + def shutdownLinkedActors(): Unit protected[akka] def invoke(messageHandle: MessageInvocation): Unit @@ -700,7 +700,7 @@ sealed class LocalActorRef private[akka]( * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists. * However, it will always participate in an existing transaction. */ - def makeTransactionRequired = guard.withGuard { + def makeTransactionRequired() = guard.withGuard { if (!isRunning || isBeingRestarted) isTransactor = true else throw new ActorInitializationException( "Can not make actor transaction required after it has been started") @@ -754,7 +754,7 @@ sealed class LocalActorRef private[akka]( /** * Shuts down the actor its dispatcher and message queue. */ - def stop = guard.withGuard { + def stop() = guard.withGuard { if (isRunning) { cancelReceiveTimeout dispatcher.unregister(this) @@ -894,7 +894,7 @@ sealed class LocalActorRef private[akka]( /** * Shuts down and removes all linked actors. */ - def shutdownLinkedActors: Unit = guard.withGuard { + def shutdownLinkedActors(): Unit = guard.withGuard { linkedActorsAsList.foreach(_.stop) linkedActors.clear } From e219b40048e09820faa1ba40c4585da67b57d13f Mon Sep 17 00:00:00 2001 From: Johan Rask Date: Mon, 5 Jul 2010 15:53:49 +0200 Subject: [PATCH 11/19] #304 Fixed Support for ApplicationContextAware in akka-spring --- .../src/main/scala/ActiveObjectFactoryBean.scala | 10 ++++++++-- .../se/scalablesolutions/akka/spring/SampleBean.java | 11 ++++++++++- akka-spring/src/test/resources/appContext.xml | 4 +++- .../src/test/scala/ActiveObjectFactoryBeanTest.scala | 4 ++++ 4 files changed, 25 insertions(+), 4 deletions(-) diff --git a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala index 4f6ea37148..1e669114ce 100644 --- a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala +++ b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala @@ -12,6 +12,7 @@ import org.springframework.beans.BeanWrapper import org.springframework.beans.BeanUtils import org.springframework.util.ReflectionUtils import org.springframework.util.StringUtils +import org.springframework.context.{ApplicationContext,ApplicationContextAware} import org.springframework.beans.factory.BeanFactory import org.springframework.beans.factory.config.AbstractFactoryBean import se.scalablesolutions.akka.actor.ActiveObject @@ -26,7 +27,7 @@ import se.scalablesolutions.akka.util.Logging * @author michaelkober * @author Johan Rask */ -class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging { +class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with ApplicationContextAware { import StringReflect._ import AkkaSpringConfigurationTags._ @@ -42,6 +43,7 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging { @BeanProperty var dispatcher: DispatcherProperties = _ @BeanProperty var scope:String = VAL_SCOPE_SINGLETON @BeanProperty var property:PropertyEntries = _ + @BeanProperty var applicationContext:ApplicationContext = _ /* * @see org.springframework.beans.factory.FactoryBean#getObjectType() @@ -78,7 +80,11 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging { private def setProperties(ref:AnyRef) : AnyRef = { log.debug("Processing properties and dependencies for target class %s",target) val beanWrapper = new BeanWrapperImpl(ref); - for(entry <- property.entryList) { + if(ref.isInstanceOf[ApplicationContextAware]) { + log.debug("Setting application context") + beanWrapper.setPropertyValue("applicationContext",applicationContext) + } + for(entry <- property.entryList) { val propertyDescriptor = BeanUtils.getPropertyDescriptor(ref.getClass,entry.name) val method = propertyDescriptor.getWriteMethod(); diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java index 37953173ec..7e29f5588d 100644 --- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java +++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java @@ -1,6 +1,15 @@ package se.scalablesolutions.akka.spring; -public class SampleBean { +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.ApplicationContext; + +public class SampleBean implements ApplicationContextAware { + + public boolean gotApplicationContext = false; + + public void setApplicationContext(ApplicationContext context) { + gotApplicationContext = true; + } public String foo(String s) { return "hello " + s; diff --git a/akka-spring/src/test/resources/appContext.xml b/akka-spring/src/test/resources/appContext.xml index 5648fa2fdf..2da3ae0c00 100644 --- a/akka-spring/src/test/resources/appContext.xml +++ b/akka-spring/src/test/resources/appContext.xml @@ -15,7 +15,9 @@ - + + + \ No newline at end of file diff --git a/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala b/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala index f1c11d0eee..39544804da 100644 --- a/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala +++ b/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala @@ -69,6 +69,10 @@ class ActiveObjectFactoryBeanTest extends Spec with ShouldMatchers { var ctx = new ClassPathXmlApplicationContext("appContext.xml"); val target:ResourceEditor = ctx.getBean("bean").asInstanceOf[ResourceEditor] assert(target.getSource === "someString") + + val sampleBean = ctx.getBean("sample").asInstanceOf[SampleBean]; + Thread.sleep(300) + assert(sampleBean.gotApplicationContext) } } } From 959873d51ab6a4acbf8220238ac647e2a3ec1a27 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Tue, 6 Jul 2010 07:00:04 +0200 Subject: [PATCH 12/19] closes #314 akka-spring to support active object lifecycle management closes #315 akka-spring to support configuration of shutdown callback method --- .../src/main/scala/actor/ActiveObject.scala | 33 ++++--- .../scala/ActiveObjectLifecycleSpec.scala | 8 ++ .../akka/spring/akka-0.10.xsd | 14 ++- .../main/scala/ActiveObjectFactoryBean.scala | 92 +++++++++---------- .../src/main/scala/ActiveObjectParser.scala | 14 ++- .../main/scala/ActiveObjectProperties.scala | 5 +- .../scala/AkkaSpringConfigurationTags.scala | 3 + .../akka/spring/SampleBean.java | 15 ++- akka-spring/src/test/resources/appContext.xml | 29 +++--- ...ActiveObjectBeanDefinitionParserTest.scala | 8 +- .../scala/ActiveObjectFactoryBeanTest.scala | 16 ++++ .../SupervisionBeanDefinitionParserTest.scala | 21 ++++- 12 files changed, 168 insertions(+), 90 deletions(-) diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index ff103b3e81..a545f9f633 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -37,6 +37,7 @@ object Annotations { final class ActiveObjectConfiguration { private[akka] var _timeout: Long = Actor.TIMEOUT private[akka] var _restartCallbacks: Option[RestartCallbacks] = None + private[akka] var _shutdownCallback: Option[ShutdownCallback] = None private[akka] var _transactionRequired = false private[akka] var _host: Option[InetSocketAddress] = None private[akka] var _messageDispatcher: Option[MessageDispatcher] = None @@ -51,6 +52,11 @@ final class ActiveObjectConfiguration { this } + def shutdownCallback(down: String) : ActiveObjectConfiguration = { + _shutdownCallback = Some(new ShutdownCallback(down)) + this + } + def makeTransactionRequired() : ActiveObjectConfiguration = { _transactionRequired = true; this @@ -153,25 +159,25 @@ object ActiveObject extends Logging { private[actor] val AW_PROXY_PREFIX = "$$ProxiedByAW".intern def newInstance[T](target: Class[T], timeout: Long): T = - newInstance(target, actorOf(new Dispatcher(false, None)), None, timeout) + newInstance(target, actorOf(new Dispatcher(false)), None, timeout) def newInstance[T](target: Class[T]): T = - newInstance(target, actorOf(new Dispatcher(false, None)), None, Actor.TIMEOUT) + newInstance(target, actorOf(new Dispatcher(false)), None, Actor.TIMEOUT) def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T = - newInstance(intf, target, actorOf(new Dispatcher(false, None)), None, timeout) + newInstance(intf, target, actorOf(new Dispatcher(false)), None, timeout) def newInstance[T](intf: Class[T], target: AnyRef): T = - newInstance(intf, target, actorOf(new Dispatcher(false, None)), None, Actor.TIMEOUT) + newInstance(intf, target, actorOf(new Dispatcher(false)), None, Actor.TIMEOUT) def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T = - newInstance(target, actorOf(new Dispatcher(false, None)), Some(new InetSocketAddress(hostname, port)), timeout) + newInstance(target, actorOf(new Dispatcher(false)), Some(new InetSocketAddress(hostname, port)), timeout) def newRemoteInstance[T](target: Class[T], hostname: String, port: Int): T = - newInstance(target, actorOf(new Dispatcher(false, None)), Some(new InetSocketAddress(hostname, port)), Actor.TIMEOUT) + newInstance(target, actorOf(new Dispatcher(false)), Some(new InetSocketAddress(hostname, port)), Actor.TIMEOUT) def newInstance[T](target: Class[T], config: ActiveObjectConfiguration): T = { - val actor = actorOf(new Dispatcher(config._transactionRequired, config._restartCallbacks)) + val actor = actorOf(new Dispatcher(config._transactionRequired, config._restartCallbacks, config._shutdownCallback)) if (config._messageDispatcher.isDefined) { actor.dispatcher = config._messageDispatcher.get } @@ -179,7 +185,7 @@ object ActiveObject extends Logging { } def newInstance[T](intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration): T = { - val actor = actorOf(new Dispatcher(config._transactionRequired, config._restartCallbacks)) + val actor = actorOf(new Dispatcher(config._transactionRequired, config._restartCallbacks, config._shutdownCallback)) if (config._messageDispatcher.isDefined) { actor.dispatcher = config._messageDispatcher.get } @@ -515,8 +521,6 @@ private[akka] sealed case class AspectInit( def this(target: Class[_], actorRef: ActorRef, timeout: Long) = this(target, actorRef, None, timeout) } -// FIXME: add @shutdown callback to ActiveObject in which we get the Aspect through 'Aspects.aspectOf(MyAspect.class, targetInstance)' and shuts down the Dispatcher actor - /** * AspectWerkz Aspect that is turning POJOs into Active Object. * Is deployed on a 'per-instance' basis. @@ -671,7 +675,7 @@ object Dispatcher { * @author Jonas Bonér */ private[akka] class Dispatcher(transactionalRequired: Boolean, - var restartCallbacks: Option[RestartCallbacks], + var restartCallbacks: Option[RestartCallbacks] = None, var shutdownCallback: Option[ShutdownCallback] = None) extends Actor { import Dispatcher._ @@ -805,12 +809,15 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, } override def shutdown = { - AspectInitRegistry.unregister(target.get); try { if (zhutdown.isDefined) { zhutdown.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) } - } catch { case e: InvocationTargetException => throw e.getCause } + } catch { + case e: InvocationTargetException => throw e.getCause + } finally { + AspectInitRegistry.unregister(target.get); + } } override def initTransactionalState = { diff --git a/akka-core/src/test/scala/ActiveObjectLifecycleSpec.scala b/akka-core/src/test/scala/ActiveObjectLifecycleSpec.scala index 406d463324..97b01c12ce 100644 --- a/akka-core/src/test/scala/ActiveObjectLifecycleSpec.scala +++ b/akka-core/src/test/scala/ActiveObjectLifecycleSpec.scala @@ -143,5 +143,13 @@ class ActiveObjectLifecycleSpec extends Spec with ShouldMatchers with BeforeAndA case e: Exception => { /* test passed */ } } } + + it("should shutdown non-supervised, non-initialized active object on ActiveObject.stop") { + val obj = ActiveObject.newInstance(classOf[SamplePojoAnnotated]) + ActiveObject.stop(obj) + assert(!obj._pre) + assert(!obj._post) + assert(obj._down) + } } } \ No newline at end of file diff --git a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-0.10.xsd b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-0.10.xsd index 9047b7c588..6eb0ec48fa 100644 --- a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-0.10.xsd +++ b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-0.10.xsd @@ -105,7 +105,7 @@ - + @@ -123,11 +123,23 @@ + + + + + + Shutdown callback method that is called during shut down. + + + + + + diff --git a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala index 4f6ea37148..d86567c5a6 100644 --- a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala +++ b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala @@ -1,22 +1,24 @@ /** - * Copyright (C) 2009-2010 Scalable Solutions AB + * Copyright (C) 2009-2010 Scalable Solutions AB */ package se.scalablesolutions.akka.spring import java.beans.PropertyDescriptor - import java.lang.reflect.Method + +import reflect.BeanProperty + import org.springframework.beans.BeanWrapperImpl import org.springframework.beans.BeanWrapper import org.springframework.beans.BeanUtils -import org.springframework.util.ReflectionUtils -import org.springframework.util.StringUtils import org.springframework.beans.factory.BeanFactory import org.springframework.beans.factory.config.AbstractFactoryBean -import se.scalablesolutions.akka.actor.ActiveObject -import reflect.BeanProperty -import se.scalablesolutions.akka.config.ScalaConfig.RestartCallbacks +import org.springframework.util.ReflectionUtils +import org.springframework.util.StringUtils + +import se.scalablesolutions.akka.actor.{ActiveObjectConfiguration, ActiveObject} +import se.scalablesolutions.akka.config.ScalaConfig.{ShutdownCallback, RestartCallbacks} import se.scalablesolutions.akka.dispatch.MessageDispatcher import se.scalablesolutions.akka.util.Logging @@ -25,6 +27,7 @@ import se.scalablesolutions.akka.util.Logging * * @author michaelkober * @author Johan Rask + * @author Martin Krasser */ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging { import StringReflect._ @@ -36,6 +39,7 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging { @BeanProperty var transactional: Boolean = false @BeanProperty var pre: String = "" @BeanProperty var post: String = "" + @BeanProperty var shutdown: String = "" @BeanProperty var host: String = "" @BeanProperty var port: Int = _ @BeanProperty var lifecycle: String = "" @@ -67,15 +71,20 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging { if (hasInterface) argumentList += "i" if (hasDispatcher) argumentList += "d" - setProperties( - create(argumentList)) -} + setProperties(create(argumentList)) + } - /** - * This method manages element by injecting either - * values () and bean references () + /** + * Stop the active object if it is a singleton. */ - private def setProperties(ref:AnyRef) : AnyRef = { + override def destroy = { + if(scope.equals(VAL_SCOPE_SINGLETON)) { + ActiveObject.stop(getObject) + } + super.destroy + } + + private def setProperties(ref:AnyRef) : AnyRef = { log.debug("Processing properties and dependencies for target class %s",target) val beanWrapper = new BeanWrapperImpl(ref); for(entry <- property.entryList) { @@ -97,60 +106,45 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging { ref } -// TODO: check if this works in 2.8 (type inferred to Nothing instead of AnyRef here) -// -// private[akka] def create(argList : String) : AnyRef = argList match { -// case "r" => ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, host, port, callbacks) -// case "ri" => ActiveObject.newRemoteInstance(interface.toClass, target.toClass, timeout, transactional, host, port, callbacks) -// case "rd" => ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks) -// case "rid" => ActiveObject.newRemoteInstance(interface.toClass, target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks) -// case "i" => ActiveObject.newInstance(interface.toClass, target.toClass, timeout, transactional, callbacks) -// case "id" => ActiveObject.newInstance(interface.toClass, target.toClass, timeout, transactional, dispatcherInstance, callbacks) -// case "d" => ActiveObject.newInstance(target.toClass, timeout, transactional, dispatcherInstance, callbacks) -// case _ => ActiveObject.newInstance(target.toClass, timeout, transactional, callbacks) -// } - private[akka] def create(argList : String) : AnyRef = { if (argList == "r") { - ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, host, port, callbacks) + ActiveObject.newInstance(target.toClass, createConfig.makeRemote(host, port)) } else if (argList == "ri" ) { - ActiveObject.newRemoteInstance(interface.toClass, aNewInstance(target.toClass), timeout, transactional, host, port, callbacks) + ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), createConfig.makeRemote(host, port)) } else if (argList == "rd") { - ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks) + ActiveObject.newInstance(target.toClass, createConfig.makeRemote(host, port).dispatcher(dispatcherInstance)) } else if (argList == "rid") { - ActiveObject.newRemoteInstance(interface.toClass, aNewInstance(target.toClass), timeout, transactional, dispatcherInstance, host, port, callbacks) + ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), createConfig.makeRemote(host, port).dispatcher(dispatcherInstance)) } else if (argList == "i") { - ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), timeout, transactional, callbacks) + ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), createConfig) } else if (argList == "id") { - ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), timeout, transactional, dispatcherInstance, callbacks) + ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), createConfig.dispatcher(dispatcherInstance)) } else if (argList == "d") { - ActiveObject.newInstance(target.toClass, timeout, transactional, dispatcherInstance, callbacks) + ActiveObject.newInstance(target.toClass, createConfig.dispatcher(dispatcherInstance)) } else { - ActiveObject.newInstance(target.toClass, timeout, transactional, callbacks) + ActiveObject.newInstance(target.toClass, createConfig) } } - def aNewInstance[T <: AnyRef](clazz: Class[T]) : T = { - clazz.newInstance().asInstanceOf[T] - } + private[akka] def createConfig: ActiveObjectConfiguration = { + val config = new ActiveObjectConfiguration().timeout(timeout) + if (hasRestartCallbacks) config.restartCallbacks(pre, post) + if (hasShutdownCallback) config.shutdownCallback(shutdown) + if (transactional) config.makeTransactionRequired + config + } - /** - * create Option[RestartCallback] - */ - private def callbacks: Option[RestartCallbacks] = { - if (hasCallbacks) { - val callbacks = new RestartCallbacks(pre, post) - Some(callbacks) - } else { - None - } + private[akka] def aNewInstance[T <: AnyRef](clazz: Class[T]) : T = { + clazz.newInstance().asInstanceOf[T] } private[akka] def isRemote = (host != null) && (!host.isEmpty) private[akka] def hasInterface = (interface != null) && (!interface.isEmpty) - private[akka] def hasCallbacks = ((pre != null) && !pre.isEmpty) || ((post != null) && !post.isEmpty) + private[akka] def hasRestartCallbacks = ((pre != null) && !pre.isEmpty) || ((post != null) && !post.isEmpty) + + private[akka] def hasShutdownCallback = ((shutdown != null) && !shutdown.isEmpty) private[akka] def hasDispatcher = (dispatcher != null) && (dispatcher.dispatcherType != null) && (!dispatcher.dispatcherType.isEmpty) diff --git a/akka-spring/src/main/scala/ActiveObjectParser.scala b/akka-spring/src/main/scala/ActiveObjectParser.scala index fc6b372720..8838360a44 100644 --- a/akka-spring/src/main/scala/ActiveObjectParser.scala +++ b/akka-spring/src/main/scala/ActiveObjectParser.scala @@ -13,6 +13,7 @@ import se.scalablesolutions.akka.actor.IllegalActorStateException * Parser trait for custom namespace configuration for active-object. * @author michaelkober * @author Johan Rask + * @author Martin Krasser */ trait ActiveObjectParser extends BeanParser with DispatcherParser { import AkkaSpringConfigurationTags._ @@ -25,7 +26,8 @@ trait ActiveObjectParser extends BeanParser with DispatcherParser { def parseActiveObject(element: Element): ActiveObjectProperties = { val objectProperties = new ActiveObjectProperties() val remoteElement = DomUtils.getChildElementByTagName(element, REMOTE_TAG); - val callbacksElement = DomUtils.getChildElementByTagName(element, RESTART_CALLBACKS_TAG); + val restartCallbacksElement = DomUtils.getChildElementByTagName(element, RESTART_CALLBACKS_TAG); + val shutdownCallbackElement = DomUtils.getChildElementByTagName(element, SHUTDOWN_CALLBACK_TAG); val dispatcherElement = DomUtils.getChildElementByTagName(element, DISPATCHER_TAG) val propertyEntries = DomUtils.getChildElementsByTagName(element,PROPERTYENTRY_TAG) @@ -34,14 +36,18 @@ trait ActiveObjectParser extends BeanParser with DispatcherParser { objectProperties.port = mandatory(remoteElement, PORT).toInt } - if (callbacksElement != null) { - objectProperties.preRestart = callbacksElement.getAttribute(PRE_RESTART) - objectProperties.postRestart = callbacksElement.getAttribute(POST_RESTART) + if (restartCallbacksElement != null) { + objectProperties.preRestart = restartCallbacksElement.getAttribute(PRE_RESTART) + objectProperties.postRestart = restartCallbacksElement.getAttribute(POST_RESTART) if ((objectProperties.preRestart.isEmpty) && (objectProperties.preRestart.isEmpty)) { throw new IllegalActorStateException("At least one of pre or post must be defined.") } } + if (shutdownCallbackElement != null) { + objectProperties.shutdown = shutdownCallbackElement.getAttribute("method") + } + if (dispatcherElement != null) { val dispatcherProperties = parseDispatcher(dispatcherElement) objectProperties.dispatcher = dispatcherProperties diff --git a/akka-spring/src/main/scala/ActiveObjectProperties.scala b/akka-spring/src/main/scala/ActiveObjectProperties.scala index ba4828e2f9..0f4b09d559 100644 --- a/akka-spring/src/main/scala/ActiveObjectProperties.scala +++ b/akka-spring/src/main/scala/ActiveObjectProperties.scala @@ -10,6 +10,7 @@ import AkkaSpringConfigurationTags._ /** * Data container for active object configuration data. * @author michaelkober + * @author Martin Krasser */ class ActiveObjectProperties { var target: String = "" @@ -18,10 +19,11 @@ class ActiveObjectProperties { var transactional: Boolean = false var preRestart: String = "" var postRestart: String = "" + var shutdown: String = "" var host: String = "" var port: Int = _ var lifecycle: String = "" - var scope:String = "" + var scope:String = VAL_SCOPE_SINGLETON var dispatcher: DispatcherProperties = _ var propertyEntries = new PropertyEntries() @@ -35,6 +37,7 @@ class ActiveObjectProperties { builder.addPropertyValue(PORT, port) builder.addPropertyValue(PRE_RESTART, preRestart) builder.addPropertyValue(POST_RESTART, postRestart) + builder.addPropertyValue(SHUTDOWN, shutdown) builder.addPropertyValue(TIMEOUT, timeout) builder.addPropertyValue(TARGET, target) builder.addPropertyValue(INTERFACE, interface) diff --git a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala index 5e927ceba1..80a9f2e8d0 100644 --- a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala +++ b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala @@ -6,6 +6,7 @@ package se.scalablesolutions.akka.spring /** * XML configuration tags. * @author michaelkober + * @author Martin Krasser */ object AkkaSpringConfigurationTags { @@ -20,6 +21,7 @@ object AkkaSpringConfigurationTags { // active-object sub tags val RESTART_CALLBACKS_TAG = "restart-callbacks" + val SHUTDOWN_CALLBACK_TAG = "shutdown-callback" val REMOTE_TAG = "remote" // superivision sub tags @@ -45,6 +47,7 @@ object AkkaSpringConfigurationTags { val PORT = "port" val PRE_RESTART = "pre" val POST_RESTART = "post" + val SHUTDOWN = "shutdown" val LIFECYCLE = "lifecycle" val SCOPE = "scope" diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java index 37953173ec..e8adaa38e7 100644 --- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java +++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java @@ -1,9 +1,22 @@ package se.scalablesolutions.akka.spring; +import se.scalablesolutions.akka.actor.annotation.shutdown; + public class SampleBean { + public boolean down; + + public SampleBean() { + down = false; + } + public String foo(String s) { return "hello " + s; } - + + @shutdown + public void shutdown() { + down = true; + } + } diff --git a/akka-spring/src/test/resources/appContext.xml b/akka-spring/src/test/resources/appContext.xml index 5648fa2fdf..bcb1a7f525 100644 --- a/akka-spring/src/test/resources/appContext.xml +++ b/akka-spring/src/test/resources/appContext.xml @@ -6,16 +6,19 @@ http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.akkasource.org/schema/akka http://scalablesolutions.se/akka/akka-0.10.xsd"> - - - - - - - - - \ No newline at end of file + + + + + + + + + + + + \ No newline at end of file diff --git a/akka-spring/src/test/scala/ActiveObjectBeanDefinitionParserTest.scala b/akka-spring/src/test/scala/ActiveObjectBeanDefinitionParserTest.scala index d8bb29ed3e..dc48ecc4b1 100644 --- a/akka-spring/src/test/scala/ActiveObjectBeanDefinitionParserTest.scala +++ b/akka-spring/src/test/scala/ActiveObjectBeanDefinitionParserTest.scala @@ -54,8 +54,8 @@ class ActiveObjectBeanDefinitionParserTest extends Spec with ShouldMatchers { val props = parser.parseActiveObject(dom(xml).getDocumentElement); assert(props != null) - assert(props.dispatcher.dispatcherType == "thread-based") -} + assert(props.dispatcher.dispatcherType === "thread-based") + } it("should parse remote ActiveObjects configuration") { val xml = val props = parser.parseActiveObject(dom(xml).getDocumentElement); assert(props != null) - assert(props.host == "com.some.host") - assert(props.port == 9999) + assert(props.host === "com.some.host") + assert(props.port === 9999) } } } diff --git a/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala b/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala index f1c11d0eee..9e1c014ce3 100644 --- a/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala +++ b/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala @@ -70,5 +70,21 @@ class ActiveObjectFactoryBeanTest extends Spec with ShouldMatchers { val target:ResourceEditor = ctx.getBean("bean").asInstanceOf[ResourceEditor] assert(target.getSource === "someString") } + + it("should stop the created active object when scope is singleton and the context is closed") { + var ctx = new ClassPathXmlApplicationContext("appContext.xml"); + val target = ctx.getBean("bean-singleton").asInstanceOf[SampleBean] + assert(!target.down) + ctx.close + assert(target.down) + } + + it("should not stop the created active object when scope is prototype and the context is closed") { + var ctx = new ClassPathXmlApplicationContext("appContext.xml"); + val target = ctx.getBean("bean-prototype").asInstanceOf[SampleBean] + assert(!target.down) + ctx.close + assert(!target.down) + } } } diff --git a/akka-spring/src/test/scala/SupervisionBeanDefinitionParserTest.scala b/akka-spring/src/test/scala/SupervisionBeanDefinitionParserTest.scala index e0344ae4fb..ffc1f7a95d 100644 --- a/akka-spring/src/test/scala/SupervisionBeanDefinitionParserTest.scala +++ b/akka-spring/src/test/scala/SupervisionBeanDefinitionParserTest.scala @@ -49,11 +49,21 @@ class SupervisionBeanDefinitionParserTest extends Spec with ShouldMatchers { parser.parseSupervisor(createSupervisorElement, builder); val supervised = builder.getBeanDefinition.getPropertyValues.getPropertyValue("supervised").getValue.asInstanceOf[List[ActiveObjectProperties]] assert(supervised != null) - expect(3) { supervised.length } + expect(4) { supervised.length } val iterator = supervised.iterator - expect("foo.bar.Foo") { iterator.next.target } - expect("foo.bar.Bar") { iterator.next.target } - expect("foo.bar.MyPojo") { iterator.next.target } + val prop1 = iterator.next + val prop2 = iterator.next + val prop3 = iterator.next + val prop4 = iterator.next + expect("foo.bar.Foo") { prop1.target } + expect("foo.bar.Bar") { prop2.target } + expect("foo.bar.MyPojo") { prop3.target } + expect("foo.bar.MyPojo") { prop4.target } + expect("preRestart") { prop3.preRestart } + expect("postRestart") { prop3.postRestart } + expect("shutdown") { prop4.shutdown } + expect("permanent") { prop1.lifecycle } + expect("temporary") { prop4.lifecycle } } it("should throw IllegalArgumentException on missing mandatory attributes") { @@ -87,6 +97,9 @@ class SupervisionBeanDefinitionParserTest extends Spec with ShouldMatchers { + + + dom(xml).getDocumentElement From b6e0ae479d3d85172d387299fde90dc9f7d35fad Mon Sep 17 00:00:00 2001 From: momania Date: Tue, 6 Jul 2010 08:28:22 +0200 Subject: [PATCH 13/19] cosmetic logging change --- akka-core/src/main/scala/actor/ActorRef.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 447931019a..ff21960d18 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -559,14 +559,14 @@ trait ActorRef extends TransactionManagement { x => Scheduler.unschedule(x) _timeoutActor = None - log.debug("Timeout canceled") + log.debug("Timeout canceled for %s", this) } } protected [akka] def checkReceiveTimeout = { cancelReceiveTimeout receiveTimeout.foreach { timeout => - log.debug("Scheduling timeout for Actor [" + toString + "]") + log.debug("Scheduling timeout for %s", this) _timeoutActor = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, timeout, TimeUnit.MILLISECONDS)) } } From 16e246a9b809bfffa6754a9a996a590af51599b5 Mon Sep 17 00:00:00 2001 From: Johan Rask Date: Tue, 6 Jul 2010 11:49:30 +0200 Subject: [PATCH 14/19] #301 DI does not work in akka-spring when specifying an interface --- .../main/scala/ActiveObjectFactoryBean.scala | 37 ++++++++++++++----- .../scalablesolutions/akka/spring/Pojo.java | 27 ++++++++++++++ .../akka/spring/PojoInf.java | 8 ++++ .../akka/spring/SampleBean.java | 12 +----- akka-spring/src/test/resources/appContext.xml | 10 ++++- .../scala/ActiveObjectFactoryBeanTest.scala | 11 +++--- 6 files changed, 78 insertions(+), 27 deletions(-) create mode 100644 akka-spring/src/test/java/se/scalablesolutions/akka/spring/Pojo.java create mode 100644 akka-spring/src/test/java/se/scalablesolutions/akka/spring/PojoInf.java diff --git a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala index bc4286cec1..5b7a62a0b5 100644 --- a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala +++ b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala @@ -48,6 +48,20 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging w @BeanProperty var scope:String = VAL_SCOPE_SINGLETON @BeanProperty var property:PropertyEntries = _ @BeanProperty var applicationContext:ApplicationContext = _ + + // Holds info about if deps has been set or not. Depends on + // if interface is specified or not. We must set deps on + // target instance if interface is specified + var hasSetDependecies = false + + + override def isSingleton:Boolean = { + if(scope.equals(VAL_SCOPE_SINGLETON)) { + true + } else { + false + } + } /* * @see org.springframework.beans.factory.FactoryBean#getObjectType() @@ -63,11 +77,6 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging w * @see org.springframework.beans.factory.config.AbstractFactoryBean#createInstance() */ def createInstance: AnyRef = { - if(scope.equals(VAL_SCOPE_SINGLETON)) { - setSingleton(true) - } else { - setSingleton(false) - } var argumentList = "" if (isRemote) argumentList += "r" if (hasInterface) argumentList += "i" @@ -86,7 +95,11 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging w super.destroy } - private def setProperties(ref:AnyRef) : AnyRef = { + private def setProperties(ref:AnyRef) : AnyRef = { + if(hasSetDependecies) { + return ref + } + log.debug("Processing properties and dependencies for target class %s",target) val beanWrapper = new BeanWrapperImpl(ref); if(ref.isInstanceOf[ApplicationContextAware]) { @@ -109,6 +122,8 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging w throw new AkkaBeansException("Either property@ref or property@value must be set on property element") } } + //un-set so next bean can be managed + hasSetDependecies = false ref } @@ -139,10 +154,12 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging w if (transactional) config.makeTransactionRequired config } - - private[akka] def aNewInstance[T <: AnyRef](clazz: Class[T]) : T = { - clazz.newInstance().asInstanceOf[T] - } + def aNewInstance[T <: AnyRef](clazz: Class[T]) : T = { + var ref = clazz.newInstance().asInstanceOf[T] + setProperties(ref) + hasSetDependecies = true + ref +} private[akka] def isRemote = (host != null) && (!host.isEmpty) diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/Pojo.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/Pojo.java new file mode 100644 index 0000000000..42e1e393e7 --- /dev/null +++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/Pojo.java @@ -0,0 +1,27 @@ +package se.scalablesolutions.akka.spring; + +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; + +public class Pojo implements PojoInf,ApplicationContextAware { + + private String string; + + private boolean gotApplicationContext = false; + + public boolean gotApplicationContext() { + return gotApplicationContext; + } + public void setApplicationContext(ApplicationContext context) { + gotApplicationContext = true; + } + + public void setString(String s) { + string = s; + } + + public String getString() { + return string; + } + + } diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/PojoInf.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/PojoInf.java new file mode 100644 index 0000000000..db31841f17 --- /dev/null +++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/PojoInf.java @@ -0,0 +1,8 @@ +package se.scalablesolutions.akka.spring; + +public interface PojoInf { + + public String getString(); + public boolean gotApplicationContext(); + + } diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java index ce8cf1fd70..e8adaa38e7 100644 --- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java +++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java @@ -1,23 +1,13 @@ package se.scalablesolutions.akka.spring; -import org.springframework.context.ApplicationContextAware; -import org.springframework.context.ApplicationContext; - import se.scalablesolutions.akka.actor.annotation.shutdown; -public class SampleBean implements ApplicationContextAware { +public class SampleBean { public boolean down; - public boolean gotApplicationContext; - public SampleBean() { down = false; - gotApplicationContext = false; - } - - public void setApplicationContext(ApplicationContext context) { - gotApplicationContext = true; } public String foo(String s) { diff --git a/akka-spring/src/test/resources/appContext.xml b/akka-spring/src/test/resources/appContext.xml index b299b3e363..e9a651b735 100644 --- a/akka-spring/src/test/resources/appContext.xml +++ b/akka-spring/src/test/resources/appContext.xml @@ -22,4 +22,12 @@ - + + + + + diff --git a/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala b/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala index e0db33e8f1..5055fb6184 100644 --- a/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala +++ b/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala @@ -65,14 +65,15 @@ class ActiveObjectFactoryBeanTest extends Spec with ShouldMatchers { assert(target.getSource === entry.value) } - it("should create an application context and inject a string dependency") { + it("should create an application context and verify dependency injection") { var ctx = new ClassPathXmlApplicationContext("appContext.xml"); val target:ResourceEditor = ctx.getBean("bean").asInstanceOf[ResourceEditor] assert(target.getSource === "someString") - - val sampleBean = ctx.getBean("sample").asInstanceOf[SampleBean]; - Thread.sleep(300) - assert(sampleBean.gotApplicationContext) + + val pojoInf = ctx.getBean("pojoInf").asInstanceOf[PojoInf]; + println("pojoInf = " + pojoInf.getString) + assert(pojoInf.getString == "akka rocks") + assert(pojoInf.gotApplicationContext) } it("should stop the created active object when scope is singleton and the context is closed") { From 07660475d3a537414756506e212c9489775dbd28 Mon Sep 17 00:00:00 2001 From: Johan Rask Date: Tue, 6 Jul 2010 13:07:25 +0200 Subject: [PATCH 15/19] Minor change, overriding destroyInstance instead of destroy --- .../src/main/scala/ActiveObjectFactoryBean.scala | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala index 5b7a62a0b5..03f0f50df3 100644 --- a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala +++ b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala @@ -85,15 +85,12 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging w setProperties(create(argumentList)) } - /** + /** * Stop the active object if it is a singleton. */ - override def destroy = { - if(scope.equals(VAL_SCOPE_SINGLETON)) { - ActiveObject.stop(getObject) - } - super.destroy - } + override def destroyInstance(instance:AnyRef) { + ActiveObject.stop(instance) + } private def setProperties(ref:AnyRef) : AnyRef = { if(hasSetDependecies) { From a890ccdaf4789d4e78383f4b5b108637414b032c Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Wed, 7 Jul 2010 07:02:23 +0200 Subject: [PATCH 16/19] Closes #318: Race condition between ActorRef.cancelReceiveTimeout and ActorRegistry.shutdownAll --- akka-core/src/main/scala/actor/ActorRef.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index ff21960d18..d14c3af29a 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -557,7 +557,7 @@ trait ActorRef extends TransactionManagement { protected[akka] def cancelReceiveTimeout = { _timeoutActor.foreach { x => - Scheduler.unschedule(x) + if (x.isRunning) Scheduler.unschedule(x) _timeoutActor = None log.debug("Timeout canceled for %s", this) } From 2e7847c0585fa76e53d954b52d2d45da5b3c0e25 Mon Sep 17 00:00:00 2001 From: Johan Rask Date: Wed, 7 Jul 2010 11:12:16 +0200 Subject: [PATCH 17/19] Added support for springs @PostConstruct and @PreDestroy --- .../main/scala/ActiveObjectFactoryBean.scala | 37 +++++++++++++++---- .../scalablesolutions/akka/spring/Pojo.java | 24 +++++++++++- .../akka/spring/PojoInf.java | 12 +++++- .../scala/ActiveObjectFactoryBeanTest.scala | 4 ++ 4 files changed, 68 insertions(+), 9 deletions(-) diff --git a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala index 03f0f50df3..e379041322 100644 --- a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala +++ b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala @@ -6,7 +6,8 @@ package se.scalablesolutions.akka.spring import java.beans.PropertyDescriptor import java.lang.reflect.Method - +import javax.annotation.PreDestroy +import javax.annotation.PostConstruct import reflect.BeanProperty import org.springframework.beans.BeanWrapperImpl @@ -82,16 +83,37 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging w if (hasInterface) argumentList += "i" if (hasDispatcher) argumentList += "d" - setProperties(create(argumentList)) + postConstruct( + setProperties( + create(argumentList))) + } /** * Stop the active object if it is a singleton. + * It will call the instance destroy method before + * stopping the active object. */ override def destroyInstance(instance:AnyRef) { + for(method <- instance.getClass.getMethods) { + if(method.isAnnotationPresent(classOf[PreDestroy])) { + method.invoke(instance) + } + } ActiveObject.stop(instance) } - + + private def postConstruct(ref:AnyRef) : AnyRef = { + // Invoke postConstruct method if any + for(method <- ref.getClass.getMethods) { + if(method.isAnnotationPresent(classOf[PostConstruct])) { + method.invoke(ref) + } + } + ref + } + + private def setProperties(ref:AnyRef) : AnyRef = { if(hasSetDependecies) { return ref @@ -119,8 +141,6 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging w throw new AkkaBeansException("Either property@ref or property@value must be set on property element") } } - //un-set so next bean can be managed - hasSetDependecies = false ref } @@ -144,6 +164,8 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging w } } + + private[akka] def createConfig: ActiveObjectConfiguration = { val config = new ActiveObjectConfiguration().timeout(timeout) if (hasRestartCallbacks) config.restartCallbacks(pre, post) @@ -153,10 +175,11 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging w } def aNewInstance[T <: AnyRef](clazz: Class[T]) : T = { var ref = clazz.newInstance().asInstanceOf[T] - setProperties(ref) + postConstruct( + setProperties(ref)) hasSetDependecies = true ref -} + } private[akka] def isRemote = (host != null) && (!host.isEmpty) diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/Pojo.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/Pojo.java index 42e1e393e7..3df506a290 100644 --- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/Pojo.java +++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/Pojo.java @@ -2,12 +2,16 @@ package se.scalablesolutions.akka.spring; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; +import javax.annotation.PreDestroy; +import javax.annotation.PostConstruct; public class Pojo implements PojoInf,ApplicationContextAware { private String string; private boolean gotApplicationContext = false; + private boolean preDestroyInvoked = false; + private boolean postConstructInvoked = false; public boolean gotApplicationContext() { return gotApplicationContext; @@ -23,5 +27,23 @@ public class Pojo implements PojoInf,ApplicationContextAware { public String getString() { return string; } - + + @PreDestroy + public void destroy(){ + preDestroyInvoked = true; + } + @PostConstruct + public void create() { + postConstructInvoked = true; + } + + + public boolean isPreDestroyInvoked() { + return preDestroyInvoked; + } + + public boolean isPostConstructInvoked() { + return postConstructInvoked; +} + } diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/PojoInf.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/PojoInf.java index db31841f17..c4e3e55c47 100644 --- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/PojoInf.java +++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/PojoInf.java @@ -1,8 +1,18 @@ package se.scalablesolutions.akka.spring; +import javax.annotation.PreDestroy; +import javax.annotation.PostConstruct; + public interface PojoInf { public String getString(); public boolean gotApplicationContext(); - + public boolean isPreDestroyInvoked(); + public boolean isPostConstructInvoked(); + + @PreDestroy + public void destroy(); + + @PostConstruct + public void create(); } diff --git a/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala b/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala index 5055fb6184..6a0b9701f6 100644 --- a/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala +++ b/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala @@ -72,8 +72,12 @@ class ActiveObjectFactoryBeanTest extends Spec with ShouldMatchers { val pojoInf = ctx.getBean("pojoInf").asInstanceOf[PojoInf]; println("pojoInf = " + pojoInf.getString) + Thread.sleep(200) + assert(pojoInf.isPostConstructInvoked) assert(pojoInf.getString == "akka rocks") assert(pojoInf.gotApplicationContext) + ctx.close + assert(pojoInf.isPreDestroyInvoked) } it("should stop the created active object when scope is singleton and the context is closed") { From aa0459ecd7459ee3357c9b3ef141848872ceecb3 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Wed, 7 Jul 2010 11:27:18 +0200 Subject: [PATCH 18/19] Dropped akka.xsd, updated all spring XML configurations to use akka-0.10.xsd --- .../akka/spring/foo/dispatcher-config.xml | 12 +- .../akka/spring/foo/supervisor-config.xml | 12 +- .../akka/spring/foo/test-config.xml | 14 +- .../se/scalablesolutions/akka/spring/akka.xsd | 240 ------------------ 4 files changed, 22 insertions(+), 256 deletions(-) delete mode 100644 akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka.xsd diff --git a/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/dispatcher-config.xml b/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/dispatcher-config.xml index 3339b07199..688d04f377 100644 --- a/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/dispatcher-config.xml +++ b/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/dispatcher-config.xml @@ -1,11 +1,13 @@ + xmlns:akka="http://www.akkasource.org/schema/akka" + xmlns:beans="http://www.springframework.org/schema/lang" + xsi:schemaLocation=" +http://www.springframework.org/schema/beans +http://www.springframework.org/schema/beans/spring-beans-2.0.xsd +http://www.akkasource.org/schema/akka +http://scalablesolutions.se/akka/akka-0.10.xsd"> + xmlns:akka="http://www.akkasource.org/schema/akka" + xmlns:beans="http://www.springframework.org/schema/lang" + xsi:schemaLocation=" +http://www.springframework.org/schema/beans +http://www.springframework.org/schema/beans/spring-beans-2.0.xsd +http://www.akkasource.org/schema/akka +http://scalablesolutions.se/akka/akka-0.10.xsd"> diff --git a/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/test-config.xml b/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/test-config.xml index ef9dce1930..665d03a05e 100644 --- a/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/test-config.xml +++ b/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/test-config.xml @@ -1,11 +1,13 @@ + xmlns:akka="http://www.akkasource.org/schema/akka" + xmlns:beans="http://www.springframework.org/schema/lang" + xsi:schemaLocation=" +http://www.springframework.org/schema/beans +http://www.springframework.org/schema/beans/spring-beans-2.0.xsd +http://www.akkasource.org/schema/akka +http://scalablesolutions.se/akka/akka-0.10.xsd"> - + diff --git a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka.xsd b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka.xsd deleted file mode 100644 index 862cd06987..0000000000 --- a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka.xsd +++ /dev/null @@ -1,240 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - Name of the remote host. - - - - - - - Port of the remote host. - - - - - - - - - - - Pre restart callback method that is called during restart. - - - - - - - Post restart callback method that is called during restart. - - - - - - - - - - - - - - - - - - Name of the target class. - - - - - - - default timeout for '!!' invocations - - - - - - - Set to true if messages should have REQUIRES_NEW semantics - - - - - - - Interface implemented by target class. - - - - - - - Lifecycle, permanent or temporary - - - - - - - Supported scopes are singleton and prototype - - - - - - - - - - - - - - - - - - - - - - - - - - - - Failover scheme, AllForOne or OneForOne - - - - - - - Maximal number of retries. - - - - - - - Timerange for restart. - - - - - - - - - - - - - - - - - - - - - - - - - - From 7da92cbd4e3e32eca715eae0d3efc408e8008b1c Mon Sep 17 00:00:00 2001 From: Johan Rask Date: Wed, 7 Jul 2010 13:31:50 +0200 Subject: [PATCH 19/19] removed @PreDestroy functionality --- .../src/main/scala/ActiveObjectFactoryBean.scala | 16 ++++++++-------- .../se/scalablesolutions/akka/spring/Pojo.java | 10 ---------- .../scalablesolutions/akka/spring/PojoInf.java | 4 ---- .../test/scala/ActiveObjectFactoryBeanTest.scala | 2 -- 4 files changed, 8 insertions(+), 24 deletions(-) diff --git a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala index e379041322..6f62c5a8c4 100644 --- a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala +++ b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala @@ -91,18 +91,18 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging w /** * Stop the active object if it is a singleton. - * It will call the instance destroy method before - * stopping the active object. */ override def destroyInstance(instance:AnyRef) { - for(method <- instance.getClass.getMethods) { - if(method.isAnnotationPresent(classOf[PreDestroy])) { - method.invoke(instance) - } - } ActiveObject.stop(instance) } - + + /** + * Invokes any method annotated with @PostConstruct + * When interfaces are specified, this method is invoked both on the + * target instance and on the active object, so a developer is free do decide + * where the annotation should be. If no interface is specified it is only invoked + * on the active object + */ private def postConstruct(ref:AnyRef) : AnyRef = { // Invoke postConstruct method if any for(method <- ref.getClass.getMethods) { diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/Pojo.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/Pojo.java index 3df506a290..04995b75c8 100644 --- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/Pojo.java +++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/Pojo.java @@ -10,7 +10,6 @@ public class Pojo implements PojoInf,ApplicationContextAware { private String string; private boolean gotApplicationContext = false; - private boolean preDestroyInvoked = false; private boolean postConstructInvoked = false; public boolean gotApplicationContext() { @@ -28,19 +27,10 @@ public class Pojo implements PojoInf,ApplicationContextAware { return string; } - @PreDestroy - public void destroy(){ - preDestroyInvoked = true; - } @PostConstruct public void create() { postConstructInvoked = true; } - - - public boolean isPreDestroyInvoked() { - return preDestroyInvoked; - } public boolean isPostConstructInvoked() { return postConstructInvoked; diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/PojoInf.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/PojoInf.java index c4e3e55c47..70d64245db 100644 --- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/PojoInf.java +++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/PojoInf.java @@ -7,12 +7,8 @@ public interface PojoInf { public String getString(); public boolean gotApplicationContext(); - public boolean isPreDestroyInvoked(); public boolean isPostConstructInvoked(); - @PreDestroy - public void destroy(); - @PostConstruct public void create(); } diff --git a/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala b/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala index 6a0b9701f6..68dac8e97c 100644 --- a/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala +++ b/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala @@ -76,8 +76,6 @@ class ActiveObjectFactoryBeanTest extends Spec with ShouldMatchers { assert(pojoInf.isPostConstructInvoked) assert(pojoInf.getString == "akka rocks") assert(pojoInf.gotApplicationContext) - ctx.close - assert(pojoInf.isPreDestroyInvoked) } it("should stop the created active object when scope is singleton and the context is closed") {