diff --git a/akka-camel/src/main/scala/ConsumerPublisher.scala b/akka-camel/src/main/scala/ConsumerPublisher.scala
index 953ac90746..8d29739f02 100644
--- a/akka-camel/src/main/scala/ConsumerPublisher.scala
+++ b/akka-camel/src/main/scala/ConsumerPublisher.scala
@@ -47,6 +47,18 @@ private[camel] object ConsumerPublisher extends Logging {
CamelContextManager.context.addRoutes(new ConsumerMethodRoute(event.uri, objectId, targetMethod))
log.info("published method %s of %s at endpoint %s" format (targetMethod, event.activeObject, event.uri))
}
+
+ /**
+ * Stops route to the already un-registered consumer actor method.
+ */
+ def handleConsumerMethodUnregistered(event: ConsumerMethodUnregistered) {
+ val targetMethod = event.method.getName
+ val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod)
+
+ CamelContextManager.activeObjectRegistry.remove(objectId)
+ CamelContextManager.context.stopRoute(objectId)
+ log.info("unpublished method %s of %s from endpoint %s" format (targetMethod, event.activeObject, event.uri))
+ }
}
/**
@@ -76,8 +88,12 @@ private[camel] class ConsumerPublisher extends Actor {
handleConsumerUnregistered(u)
latch.countDown // needed for testing only.
}
- case d: ConsumerMethodRegistered => {
- handleConsumerMethodRegistered(d)
+ case mr: ConsumerMethodRegistered => {
+ handleConsumerMethodRegistered(mr)
+ latch.countDown // needed for testing only.
+ }
+ case mu: ConsumerMethodUnregistered => {
+ handleConsumerMethodUnregistered(mu)
latch.countDown // needed for testing only.
}
case SetExpectedMessageCount(num) => {
@@ -94,7 +110,6 @@ private[camel] class ConsumerPublisher extends Actor {
*/
private[camel] case class SetExpectedMessageCount(num: Int)
-
/**
* Defines an abstract route to a target which is either an actor or an active object method..
*
@@ -171,6 +186,8 @@ private[camel] class PublishRequestor extends Actor {
for (event <- ConsumerUnregistered.forConsumer(actor)) deliverCurrentEvent(event)
case AspectInitRegistered(proxy, init) =>
for (event <- ConsumerMethodRegistered.forConsumer(proxy, init)) deliverCurrentEvent(event)
+ case AspectInitUnregistered(proxy, init) =>
+ for (event <- ConsumerMethodUnregistered.forConsumer(proxy, init)) deliverCurrentEvent(event)
case PublishRequestorInit(pub) => {
publisher = Some(pub)
deliverBufferedEvents
@@ -244,6 +261,20 @@ private[camel] case class ConsumerUnregistered(actorRef: ActorRef, uri: String,
*/
private[camel] case class ConsumerMethodRegistered(activeObject: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent
+/**
+ * Event indicating that an active object has been stopped. For each
+ * @consume annotated POJO method a separate instance of this class is
+ * created.
+ *
+ * @param activeObject active object (proxy).
+ * @param init
+ * @param uri endpoint URI of the active object method
+ * @param method method to be un-published.
+ *
+ * @author Martin Krasser
+ */
+private[camel] case class ConsumerMethodUnregistered(activeObject: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent
+
/**
* @author Martin Krasser
*/
@@ -272,6 +303,26 @@ private[camel] object ConsumerUnregistered {
}
}
+/**
+ * @author Martin Krasser
+ */
+private[camel] object ConsumerMethod {
+ /**
+ * Applies a function f to each consumer method of activeObject and
+ * returns the function results as a list. A consumer method is one that is annotated with
+ * @consume. If activeObject is a proxy for a remote active object
+ * f is never called and Nil is returned.
+ */
+ def forConsumer[T](activeObject: AnyRef, init: AspectInit)(f: Method => T): List[T] = {
+ // TODO: support consumer annotation inheritance
+ // - visit overridden methods in superclasses
+ // - visit implemented method declarations in interfaces
+ if (init.remoteAddress.isDefined) Nil // let remote node publish active object methods on endpoints
+ else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume])))
+ yield f(m)
+ }
+}
+
/**
* @author Martin Krasser
*/
@@ -282,12 +333,22 @@ private[camel] object ConsumerMethodRegistered {
* have any @consume annotated methods.
*/
def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodRegistered] = {
- // TODO: support consumer annotation inheritance
- // - visit overridden methods in superclasses
- // - visit implemented method declarations in interfaces
- if (init.remoteAddress.isDefined) Nil // let remote node publish active object methods on endpoints
- else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume])))
- yield ConsumerMethodRegistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
+ ConsumerMethod.forConsumer[ConsumerMethodRegistered](activeObject, init) {
+ m => ConsumerMethodRegistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
+ }
+ }
+}
+
+private[camel] object ConsumerMethodUnregistered {
+ /**
+ * Creates a list of ConsumerMethodUnregistered event messages for an active object or an empty
+ * list if the active object is a proxy for an remote active object or the active object doesn't
+ * have any @consume annotated methods.
+ */
+ def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodUnregistered] = {
+ ConsumerMethod.forConsumer[ConsumerMethodUnregistered](activeObject, init) {
+ m => ConsumerMethodUnregistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
+ }
}
}
diff --git a/akka-camel/src/test/scala/CamelServiceFeatureTest.scala b/akka-camel/src/test/scala/CamelServiceFeatureTest.scala
index 771ed83af3..1e88b62bf2 100644
--- a/akka-camel/src/test/scala/CamelServiceFeatureTest.scala
+++ b/akka-camel/src/test/scala/CamelServiceFeatureTest.scala
@@ -59,7 +59,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
feature("Unpublish registered consumer actor from the global CamelContext") {
- scenario("attempt access to unregistered consumer actor via Camel direct-endpoint") {
+ scenario("access to unregistered consumer actor via Camel direct-endpoint fails") {
val endpointUri = "direct:unpublish-test-1"
given("a consumer actor that has been stopped")
@@ -78,7 +78,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
when("a request is sent to this actor")
val response1 = CamelContextManager.template.requestBody(endpointUri, "msg1")
- then("the direct endpoint falls back to its default behaviour and returns the original message")
+ then("the direct-endpoint falls back to its default behaviour and returns the original message")
assert(response1 === "msg1")
}
}
@@ -103,8 +103,8 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
scenario("access active object methods via Camel direct-endpoints") {
given("an active object registered after CamelService startup")
- val latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
- ActiveObject.newInstance(classOf[PojoBase])
+ var latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
+ val obj = ActiveObject.newInstance(classOf[PojoBase])
assert(latch.await(5000, TimeUnit.MILLISECONDS))
when("requests are sent to published methods")
@@ -116,6 +116,36 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
assert(response1 === "m2base: x y")
assert(response2 === "m3base: x y")
assert(response3 === "m4base: x y")
+
+ // cleanup to avoid conflicts with next test (i.e. avoid multiple consumers on direct-endpoints)
+ latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
+ ActiveObject.stop(obj)
+ assert(latch.await(5000, TimeUnit.MILLISECONDS))
+ }
+ }
+
+ feature("Unpublish active object method from the global CamelContext") {
+
+ scenario("access to unregistered active object methof via Camel direct-endpoint fails") {
+
+ given("an active object that has been stopped")
+ var latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
+ val obj = ActiveObject.newInstance(classOf[PojoBase])
+ assert(latch.await(5000, TimeUnit.MILLISECONDS))
+
+ latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
+ ActiveObject.stop(obj)
+ assert(latch.await(5000, TimeUnit.MILLISECONDS))
+
+ when("requests are sent to published methods")
+ val response1 = CamelContextManager.template.requestBodyAndHeader("direct:m2base", "x", "test", "y")
+ val response2 = CamelContextManager.template.requestBodyAndHeader("direct:m3base", "x", "test", "y")
+ val response3 = CamelContextManager.template.requestBodyAndHeader("direct:m4base", "x", "test", "y")
+
+ then("the direct-endpoints fall back to their default behaviour and return the original message")
+ assert(response1 === "x")
+ assert(response2 === "x")
+ assert(response3 === "x")
}
}
}
diff --git a/akka-camel/src/test/scala/ConsumerMethodRegisteredTest.scala b/akka-camel/src/test/scala/ConsumerMethodRegisteredTest.scala
index 493030fa4a..7c28c7d8ee 100644
--- a/akka-camel/src/test/scala/ConsumerMethodRegisteredTest.scala
+++ b/akka-camel/src/test/scala/ConsumerMethodRegisteredTest.scala
@@ -2,21 +2,19 @@ package se.scalablesolutions.akka.camel
import java.net.InetSocketAddress
-import org.junit.Test
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.{AspectInit, ActiveObject}
import se.scalablesolutions.akka.camel.ConsumerMethodRegistered._
+import org.junit.{AfterClass, Test}
class ConsumerMethodRegisteredTest extends JUnitSuite {
+ import ConsumerMethodRegisteredTest._
+
val remoteAddress = new InetSocketAddress("localhost", 8888);
val remoteAspectInit = AspectInit(classOf[String], null, Some(remoteAddress), 1000)
val localAspectInit = AspectInit(classOf[String], null, None, 1000)
- val activePojoBase = ActiveObject.newInstance(classOf[PojoBase])
- val activePojoSub = ActiveObject.newInstance(classOf[PojoSub])
- val activePojoIntf = ActiveObject.newInstance(classOf[PojoIntf], new PojoImpl)
-
val ascendingMethodName = (r1: ConsumerMethodRegistered, r2: ConsumerMethodRegistered) =>
r1.method.getName < r2.method.getName
@@ -44,3 +42,16 @@ class ConsumerMethodRegisteredTest extends JUnitSuite {
}
}
+
+object ConsumerMethodRegisteredTest {
+ val activePojoBase = ActiveObject.newInstance(classOf[PojoBase])
+ val activePojoSub = ActiveObject.newInstance(classOf[PojoSub])
+ val activePojoIntf = ActiveObject.newInstance(classOf[PojoIntf], new PojoImpl)
+
+ @AfterClass
+ def afterClass = {
+ ActiveObject.stop(activePojoBase)
+ ActiveObject.stop(activePojoSub)
+ ActiveObject.stop(activePojoIntf)
+ }
+}
diff --git a/akka-camel/src/test/scala/PublishRequestorTest.scala b/akka-camel/src/test/scala/PublishRequestorTest.scala
index 7729e6eec6..44c6c30684 100644
--- a/akka-camel/src/test/scala/PublishRequestorTest.scala
+++ b/akka-camel/src/test/scala/PublishRequestorTest.scala
@@ -44,6 +44,19 @@ class PublishRequestorTest extends JUnitSuite {
assert(event.method.getName === "foo")
}
+ @Test def shouldReceiveConsumerMethodUnregisteredEvent = {
+ val obj = ActiveObject.newInstance(classOf[PojoSingle])
+ val init = AspectInit(classOf[PojoSingle], null, None, 1000)
+ val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get
+ requestor ! AspectInitUnregistered(obj, init)
+ assert(latch.await(5000, TimeUnit.MILLISECONDS))
+ val event = (publisher !! GetRetainedMessage).get.asInstanceOf[ConsumerMethodUnregistered]
+ assert(event.init === init)
+ assert(event.uri === "direct:foo")
+ assert(event.activeObject === obj)
+ assert(event.method.getName === "foo")
+ }
+
@Test def shouldReceiveConsumerRegisteredEvent = {
val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get
requestor ! ActorRegistered(consumer)
diff --git a/akka-camel/src/test/scala/RemoteConsumerTest.scala b/akka-camel/src/test/scala/RemoteConsumerTest.scala
index 149585df4b..7e3b666590 100644
--- a/akka-camel/src/test/scala/RemoteConsumerTest.scala
+++ b/akka-camel/src/test/scala/RemoteConsumerTest.scala
@@ -55,14 +55,13 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh
}
}
- /* TODO: enable once issues with remote active objects are resolved
feature("Client-initiated remote consumer active object") {
scenario("access published remote consumer method") {
given("a client-initiated remote consumer active object")
val consumer = ActiveObject.newRemoteInstance(classOf[PojoRemote], host, port)
when("remote consumer publication is triggered")
- val latch = service.consumerPublisher.!).get
+ val latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get
consumer.foo("init")
assert(latch.await(5000, TimeUnit.MILLISECONDS))
@@ -71,7 +70,6 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh
assert(response === "remote active object: test")
}
}
- */
}
object RemoteConsumerTest {
diff --git a/akka-core/src/main/java/se/scalablesolutions/akka/annotation/shutdown.java b/akka-core/src/main/java/se/scalablesolutions/akka/annotation/shutdown.java
new file mode 100644
index 0000000000..f806e7bca6
--- /dev/null
+++ b/akka-core/src/main/java/se/scalablesolutions/akka/annotation/shutdown.java
@@ -0,0 +1,14 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+
+package se.scalablesolutions.akka.actor.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface shutdown {}
\ No newline at end of file
diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala
index fc70441fae..a545f9f633 100644
--- a/akka-core/src/main/scala/actor/ActiveObject.scala
+++ b/akka-core/src/main/scala/actor/ActiveObject.scala
@@ -25,6 +25,7 @@ object Annotations {
val transactionrequired = classOf[transactionrequired]
val prerestart = classOf[prerestart]
val postrestart = classOf[postrestart]
+ val shutdown = classOf[shutdown]
val inittransactionalstate = classOf[inittransactionalstate]
}
@@ -36,6 +37,7 @@ object Annotations {
final class ActiveObjectConfiguration {
private[akka] var _timeout: Long = Actor.TIMEOUT
private[akka] var _restartCallbacks: Option[RestartCallbacks] = None
+ private[akka] var _shutdownCallback: Option[ShutdownCallback] = None
private[akka] var _transactionRequired = false
private[akka] var _host: Option[InetSocketAddress] = None
private[akka] var _messageDispatcher: Option[MessageDispatcher] = None
@@ -50,6 +52,11 @@ final class ActiveObjectConfiguration {
this
}
+ def shutdownCallback(down: String) : ActiveObjectConfiguration = {
+ _shutdownCallback = Some(new ShutdownCallback(down))
+ this
+ }
+
def makeTransactionRequired() : ActiveObjectConfiguration = {
_transactionRequired = true;
this
@@ -152,25 +159,25 @@ object ActiveObject extends Logging {
private[actor] val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
def newInstance[T](target: Class[T], timeout: Long): T =
- newInstance(target, actorOf(new Dispatcher(false, None)), None, timeout)
+ newInstance(target, actorOf(new Dispatcher(false)), None, timeout)
def newInstance[T](target: Class[T]): T =
- newInstance(target, actorOf(new Dispatcher(false, None)), None, Actor.TIMEOUT)
+ newInstance(target, actorOf(new Dispatcher(false)), None, Actor.TIMEOUT)
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T =
- newInstance(intf, target, actorOf(new Dispatcher(false, None)), None, timeout)
+ newInstance(intf, target, actorOf(new Dispatcher(false)), None, timeout)
def newInstance[T](intf: Class[T], target: AnyRef): T =
- newInstance(intf, target, actorOf(new Dispatcher(false, None)), None, Actor.TIMEOUT)
+ newInstance(intf, target, actorOf(new Dispatcher(false)), None, Actor.TIMEOUT)
def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T =
- newInstance(target, actorOf(new Dispatcher(false, None)), Some(new InetSocketAddress(hostname, port)), timeout)
+ newInstance(target, actorOf(new Dispatcher(false)), Some(new InetSocketAddress(hostname, port)), timeout)
def newRemoteInstance[T](target: Class[T], hostname: String, port: Int): T =
- newInstance(target, actorOf(new Dispatcher(false, None)), Some(new InetSocketAddress(hostname, port)), Actor.TIMEOUT)
+ newInstance(target, actorOf(new Dispatcher(false)), Some(new InetSocketAddress(hostname, port)), Actor.TIMEOUT)
def newInstance[T](target: Class[T], config: ActiveObjectConfiguration): T = {
- val actor = actorOf(new Dispatcher(config._transactionRequired, config._restartCallbacks))
+ val actor = actorOf(new Dispatcher(config._transactionRequired, config._restartCallbacks, config._shutdownCallback))
if (config._messageDispatcher.isDefined) {
actor.dispatcher = config._messageDispatcher.get
}
@@ -178,7 +185,7 @@ object ActiveObject extends Logging {
}
def newInstance[T](intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration): T = {
- val actor = actorOf(new Dispatcher(config._transactionRequired, config._restartCallbacks))
+ val actor = actorOf(new Dispatcher(config._transactionRequired, config._restartCallbacks, config._shutdownCallback))
if (config._messageDispatcher.isDefined) {
actor.dispatcher = config._messageDispatcher.get
}
@@ -358,7 +365,6 @@ object ActiveObject extends Logging {
val proxy = Proxy.newInstance(target, true, false)
val context = injectActiveObjectContext(proxy)
actorRef.actor.asInstanceOf[Dispatcher].initialize(target, proxy, context)
- ActorRegistry.unregister(actorRef) // do not store the dispatcher in the ActorRegistry since it will prevent GC
actorRef.timeout = timeout
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
AspectInitRegistry.register(proxy, AspectInit(target, actorRef, remoteAddress, timeout))
@@ -371,7 +377,6 @@ object ActiveObject extends Logging {
val context = injectActiveObjectContext(target)
val proxy = Proxy.newInstance(Array(intf), Array(target), true, false)
actorRef.actor.asInstanceOf[Dispatcher].initialize(target.getClass, target, context)
- ActorRegistry.unregister(actorRef) // do not store the dispatcher in the ActorRegistry since it will prevent GC
actorRef.timeout = timeout
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
AspectInitRegistry.register(proxy, AspectInit(intf, actorRef, remoteAddress, timeout))
@@ -379,6 +384,11 @@ object ActiveObject extends Logging {
proxy.asInstanceOf[T]
}
+ def stop(obj: AnyRef): Unit = {
+ val init = AspectInitRegistry.initFor(obj)
+ init.actorRef.stop
+ }
+
/**
* Get the underlying dispatcher actor for the given active object.
*/
@@ -483,9 +493,7 @@ private[akka] object AspectInitRegistry extends ListenerManagement {
private val initializations = new java.util.concurrent.ConcurrentHashMap[AnyRef, AspectInit]
def initFor(target: AnyRef) = {
- val init = initializations.get(target)
- initializations.remove(target)
- init
+ initializations.get(target)
}
def register(target: AnyRef, init: AspectInit) = {
@@ -493,10 +501,17 @@ private[akka] object AspectInitRegistry extends ListenerManagement {
foreachListener(_ ! AspectInitRegistered(target, init))
res
}
+
+ def unregister(target: AnyRef) = {
+ val res = initializations.remove(target)
+ foreachListener(_ ! AspectInitUnregistered(target, res))
+ res
+ }
}
private[akka] sealed trait AspectInitRegistryEvent
private[akka] case class AspectInitRegistered(proxy: AnyRef, init: AspectInit) extends AspectInitRegistryEvent
+private[akka] case class AspectInitUnregistered(proxy: AnyRef, init: AspectInit) extends AspectInitRegistryEvent
private[akka] sealed case class AspectInit(
val target: Class[_],
@@ -506,8 +521,6 @@ private[akka] sealed case class AspectInit(
def this(target: Class[_], actorRef: ActorRef, timeout: Long) = this(target, actorRef, None, timeout)
}
-// FIXME: add @shutdown callback to ActiveObject in which we get the Aspect through 'Aspects.aspectOf(MyAspect.class, targetInstance)' and shuts down the Dispatcher actor
-
/**
* AspectWerkz Aspect that is turning POJOs into Active Object.
* Is deployed on a 'per-instance' basis.
@@ -517,6 +530,7 @@ private[akka] sealed case class AspectInit(
@Aspect("perInstance")
private[akka] sealed class ActiveObjectAspect {
@volatile private var isInitialized = false
+ @volatile private var isStopped = false
private var target: Class[_] = _
private var actorRef: ActorRef = _
private var remoteAddress: Option[InetSocketAddress] = _
@@ -547,7 +561,11 @@ private[akka] sealed class ActiveObjectAspect {
val isOneWay = isVoid(rtti)
val sender = ActiveObjectContext.sender.value
val senderFuture = ActiveObjectContext.senderFuture.value
- if (isOneWay) {
+
+ if (!actorRef.isRunning && !isStopped) {
+ isStopped = true
+ joinPoint.proceed
+ } else if (isOneWay) {
actorRef ! Invocation(joinPoint, true, true, sender, senderFuture)
null.asInstanceOf[AnyRef]
} else {
@@ -656,10 +674,13 @@ object Dispatcher {
*
* @author Jonas Bonér
*/
-private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Option[RestartCallbacks]) extends Actor {
+private[akka] class Dispatcher(transactionalRequired: Boolean,
+ var restartCallbacks: Option[RestartCallbacks] = None,
+ var shutdownCallback: Option[ShutdownCallback] = None) extends Actor {
import Dispatcher._
private[actor] var target: Option[AnyRef] = None
+ private var zhutdown: Option[Method] = None
private var preRestart: Option[Method] = None
private var postRestart: Option[Method] = None
private var initTxState: Option[Method] = None
@@ -681,7 +702,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op
val methods = targetInstance.getClass.getDeclaredMethods.toList
// See if we have any config define restart callbacks
- callbacks match {
+ restartCallbacks match {
case None => {}
case Some(RestartCallbacks(pre, post)) =>
preRestart = Some(try {
@@ -695,10 +716,22 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op
"Could not find post restart method [" + post + "] \nin [" +
targetClass.getName + "]. \nIt must have a zero argument definition.") })
}
+ // See if we have any config define a shutdown callback
+ shutdownCallback match {
+ case None => {}
+ case Some(ShutdownCallback(down)) =>
+ zhutdown = Some(try {
+ targetInstance.getClass.getDeclaredMethod(down, ZERO_ITEM_CLASS_ARRAY: _*)
+ } catch { case e => throw new IllegalStateException(
+ "Could not find shutdown method [" + down + "] \nin [" +
+ targetClass.getName + "]. \nIt must have a zero argument definition.") })
+ }
// See if we have any annotation defined restart callbacks
if (!preRestart.isDefined) preRestart = methods.find(m => m.isAnnotationPresent(Annotations.prerestart))
if (!postRestart.isDefined) postRestart = methods.find(m => m.isAnnotationPresent(Annotations.postrestart))
+ // See if we have an annotation defined shutdown callback
+ if (!zhutdown.isDefined) zhutdown = methods.find(m => m.isAnnotationPresent(Annotations.shutdown))
if (preRestart.isDefined && preRestart.get.getParameterTypes.length != 0)
throw new IllegalActorStateException(
@@ -708,9 +741,14 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op
throw new IllegalActorStateException(
"Method annotated with @postrestart or defined as a restart callback in \n[" +
targetClass.getName + "] must have a zero argument definition")
+ if (zhutdown.isDefined && zhutdown.get.getParameterTypes.length != 0)
+ throw new IllegalStateException(
+ "Method annotated with @shutdown or defined as a shutdown callback in \n[" +
+ targetClass.getName + "] must have a zero argument definition")
if (preRestart.isDefined) preRestart.get.setAccessible(true)
if (postRestart.isDefined) postRestart.get.setAccessible(true)
+ if (zhutdown.isDefined) zhutdown.get.setAccessible(true)
// see if we have a method annotated with @inittransactionalstate, if so invoke it
initTxState = methods.find(m => m.isAnnotationPresent(Annotations.inittransactionalstate))
@@ -770,6 +808,18 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op
}
}
+ override def shutdown = {
+ try {
+ if (zhutdown.isDefined) {
+ zhutdown.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
+ }
+ } catch {
+ case e: InvocationTargetException => throw e.getCause
+ } finally {
+ AspectInitRegistry.unregister(target.get);
+ }
+ }
+
override def initTransactionalState = {
try {
if (initTxState.isDefined && target.isDefined) initTxState.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala
index 33bf9bf998..b38401a4a6 100644
--- a/akka-core/src/main/scala/actor/Actor.scala
+++ b/akka-core/src/main/scala/actor/Actor.scala
@@ -62,7 +62,6 @@ class ActorInitializationException private[akka](message: String) extends Runtim
*/
object Actor extends Logging {
val TIMEOUT = config.getInt("akka.actor.timeout", 5000)
- val RECEIVE_TIMEOUT = config.getInt("akka.actor.receive.timeout", 30000)
val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
/**
@@ -435,7 +434,6 @@ trait Actor extends Logging {
// =========================================
private[akka] def base: Receive = try {
- cancelReceiveTimeout
lifeCycles orElse (self.hotswap getOrElse receive)
} catch {
case e: NullPointerException => throw new IllegalActorStateException(
@@ -443,7 +441,7 @@ trait Actor extends Logging {
}
private val lifeCycles: Receive = {
- case HotSwap(code) => self.hotswap = code; checkReceiveTimeout
+ case HotSwap(code) => self.hotswap = code; self.checkReceiveTimeout // FIXME : how to reschedule receivetimeout on hotswap?
case Restart(reason) => self.restart(reason)
case Exit(dead, reason) => self.handleTrapExit(dead, reason)
case Link(child) => self.link(child)
@@ -451,25 +449,6 @@ trait Actor extends Logging {
case UnlinkAndStop(child) => self.unlink(child); child.stop
case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
}
-
- @volatile protected[akka] var timeoutActor: Option[ActorRef] = None
-
- private[akka] def cancelReceiveTimeout = {
- timeoutActor.foreach {
- x =>
- Scheduler.unschedule(x)
- timeoutActor = None
- log.debug("Timeout canceled")
- }
- }
-
- private[akka] def checkReceiveTimeout = {
- //if ((self.hotswap getOrElse receive).isDefinedAt(ReceiveTimeout)) { // FIXME use when 'self' is safe to use, throws NPE sometimes
- if ((receive ne null) && receive.isDefinedAt(ReceiveTimeout)) {
- log.debug("Scheduling timeout for Actor [" + toString + "]")
- timeoutActor = Some(Scheduler.scheduleOnce(self, ReceiveTimeout, self.receiveTimeout, TimeUnit.MILLISECONDS))
- }
- }
}
private[actor] class AnyOptionAsTypedOption(anyOption: Option[Any]) {
diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala
index c412672aa7..d14c3af29a 100644
--- a/akka-core/src/main/scala/actor/ActorRef.scala
+++ b/akka-core/src/main/scala/actor/ActorRef.scala
@@ -24,12 +24,12 @@ import jsr166x.{Deque, ConcurrentLinkedDeque}
import java.net.InetSocketAddress
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicReference
-import java.util.concurrent.ConcurrentHashMap
import java.util.{Map => JMap}
import java.lang.reflect.Field
import RemoteActorSerialization._
import com.google.protobuf.ByteString
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
/**
* ActorRef is an immutable and serializable handle to an Actor.
@@ -72,6 +72,8 @@ trait ActorRef extends TransactionManagement {
@volatile protected[akka] var _isBeingRestarted = false
@volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServer.HOSTNAME, RemoteServer.PORT)
+ @volatile protected[akka] var _timeoutActor: Option[ActorRef] = None
+
@volatile protected[akka] var startOnCreation = false
@volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false
protected[this] val guard = new ReentrantGuard
@@ -100,9 +102,9 @@ trait ActorRef extends TransactionManagement {
* User overridable callback/setting.
*
* Defines the default timeout for an initial receive invocation.
- * Used if the receive (or HotSwap) contains a case handling ReceiveTimeout.
+ * When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
*/
- @volatile var receiveTimeout: Long = Actor.RECEIVE_TIMEOUT
+ @volatile var receiveTimeout: Option[Long] = None
/**
* User overridable callback/setting.
@@ -551,6 +553,24 @@ trait ActorRef extends TransactionManagement {
}
override def toString = "Actor[" + id + ":" + uuid + "]"
+
+ protected[akka] def cancelReceiveTimeout = {
+ _timeoutActor.foreach {
+ x =>
+ if (x.isRunning) Scheduler.unschedule(x)
+ _timeoutActor = None
+ log.debug("Timeout canceled for %s", this)
+ }
+ }
+
+ protected [akka] def checkReceiveTimeout = {
+ cancelReceiveTimeout
+ receiveTimeout.foreach { timeout =>
+ log.debug("Scheduling timeout for %s", this)
+ _timeoutActor = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, timeout, TimeUnit.MILLISECONDS))
+ }
+ }
+
}
/**
@@ -734,8 +754,9 @@ sealed class LocalActorRef private[akka](
/**
* Shuts down the actor its dispatcher and message queue.
*/
- def stop = guard.withGuard {
+ def stop() = guard.withGuard {
if (isRunning) {
+ cancelReceiveTimeout
dispatcher.unregister(this)
_transactionFactory = None
_isRunning = false
@@ -1000,6 +1021,7 @@ sealed class LocalActorRef private[akka](
setTransactionSet(txSet)
try {
+ cancelReceiveTimeout // FIXME: leave this here?
if (isTransactor) {
val txFactory = _transactionFactory.getOrElse(DefaultGlobalTransactionFactory)
atomic(txFactory) {
@@ -1057,7 +1079,7 @@ sealed class LocalActorRef private[akka](
val failedActor = actorInstance.get
failedActor.synchronized {
lifeCycle.get match {
- case LifeCycle(scope, _) => {
+ case LifeCycle(scope, _, _) => {
scope match {
case Permanent =>
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
@@ -1086,7 +1108,7 @@ sealed class LocalActorRef private[akka](
linkedActorsAsList.foreach { actorRef =>
if (actorRef.lifeCycle.isEmpty) actorRef.lifeCycle = Some(LifeCycle(Permanent))
actorRef.lifeCycle.get match {
- case LifeCycle(scope, _) => {
+ case LifeCycle(scope, _, _) => {
scope match {
case Permanent => actorRef.restart(reason)
case Temporary => shutDownTemporaryActor(actorRef)
@@ -1158,7 +1180,7 @@ sealed class LocalActorRef private[akka](
ActorRegistry.register(this)
if (id == "N/A") id = actorClass.getName // if no name set, then use default name (class name)
clearTransactionSet // clear transaction set that might have been created if atomic block has been used within the Actor constructor body
- actor.checkReceiveTimeout
+ checkReceiveTimeout
}
private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) {
@@ -1221,7 +1243,7 @@ private[akka] case class RemoteActorRef private[akka] (
this
}
- def stop(): Unit = {
+ def stop: Unit = {
_isRunning = false
_isShutDown = true
}
@@ -1237,7 +1259,7 @@ private[akka] case class RemoteActorRef private[akka] (
def actorClass: Class[_ <: Actor] = unsupported
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
def dispatcher: MessageDispatcher = unsupported
- def makeTransactionRequired(): Unit = unsupported
+ def makeTransactionRequired: Unit = unsupported
def transactionConfig_=(config: TransactionConfig): Unit = unsupported
def transactionConfig: TransactionConfig = unsupported
def makeRemote(hostname: String, port: Int): Unit = unsupported
@@ -1254,7 +1276,7 @@ private[akka] case class RemoteActorRef private[akka] (
def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = unsupported
def mailboxSize: Int = unsupported
def supervisor: Option[ActorRef] = unsupported
- def shutdownLinkedActors(): Unit = unsupported
+ def shutdownLinkedActors: Unit = unsupported
protected[akka] def mailbox: Deque[MessageInvocation] = unsupported
protected[akka] def restart(reason: Throwable): Unit = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported
diff --git a/akka-core/src/main/scala/actor/SerializationProtocol.scala b/akka-core/src/main/scala/actor/SerializationProtocol.scala
index 54ee27034e..d549bb8c80 100644
--- a/akka-core/src/main/scala/actor/SerializationProtocol.scala
+++ b/akka-core/src/main/scala/actor/SerializationProtocol.scala
@@ -85,10 +85,10 @@ object ActorSerialization {
}
val builder = LifeCycleProtocol.newBuilder
a.lifeCycle match {
- case Some(LifeCycle(scope, None)) =>
+ case Some(LifeCycle(scope, None, _)) =>
setScope(builder, scope)
Some(builder.build)
- case Some(LifeCycle(scope, Some(callbacks))) =>
+ case Some(LifeCycle(scope, Some(callbacks), _)) =>
setScope(builder, scope)
builder.setPreRestart(callbacks.preRestart)
builder.setPostRestart(callbacks.postRestart)
diff --git a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
index 3beac783a7..54174b6030 100644
--- a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
+++ b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
@@ -82,7 +82,9 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
private def newSubclassingProxy(component: Component): DependencyBinding = {
val targetClass = component.target
- val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks))
+ val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired,
+ component.lifeCycle.restartCallbacks,
+ component.lifeCycle.shutdownCallback))
if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get
val remoteAddress =
if (component.remoteAddress.isDefined)
@@ -99,7 +101,9 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
val targetClass = component.intf.get
val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry
component.target.getConstructor(Array[Class[_]](): _*).setAccessible(true)
- val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks))
+ val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired,
+ component.lifeCycle.restartCallbacks,
+ component.lifeCycle.shutdownCallback))
if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get
val remoteAddress =
if (component.remoteAddress.isDefined)
diff --git a/akka-core/src/main/scala/config/SupervisionConfig.scala b/akka-core/src/main/scala/config/SupervisionConfig.scala
index 7e03fcb6f8..1f5fd15a9b 100644
--- a/akka-core/src/main/scala/config/SupervisionConfig.scala
+++ b/akka-core/src/main/scala/config/SupervisionConfig.scala
@@ -42,13 +42,15 @@ object ScalaConfig {
case object AllForOne extends FailOverScheme
case object OneForOne extends FailOverScheme
- case class LifeCycle(scope: Scope, callbacks: Option[RestartCallbacks]) extends ConfigElement
- object LifeCycle {
- def apply(scope: Scope) = new LifeCycle(scope, None)
- }
+ case class LifeCycle(scope: Scope,
+ restartCallbacks: Option[RestartCallbacks] = None,
+ shutdownCallback: Option[ShutdownCallback] = None) extends ConfigElement
case class RestartCallbacks(preRestart: String, postRestart: String) {
if ((preRestart eq null) || (postRestart eq null)) throw new IllegalArgumentException("Restart callback methods can't be null")
}
+ case class ShutdownCallback(shutdown: String) {
+ if (shutdown eq null) throw new IllegalArgumentException("Shutdown callback method can't be null")
+ }
case object Permanent extends Scope
case object Temporary extends Scope
@@ -135,17 +137,25 @@ object JavaConfig {
scheme.transform, maxNrOfRetries, withinTimeRange, trapExceptions.toList)
}
- class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val callbacks: RestartCallbacks) extends ConfigElement {
- def this(scope: Scope) = this(scope, null)
+ class LifeCycle(@BeanProperty val scope: Scope,
+ @BeanProperty val restartCallbacks: RestartCallbacks,
+ @BeanProperty val shutdownCallback: ShutdownCallback) extends ConfigElement {
+ def this(scope: Scope) = this(scope, null, null)
+ def this(scope: Scope, restartCallbacks: RestartCallbacks) = this(scope, restartCallbacks, null)
+ def this(scope: Scope, shutdownCallback: ShutdownCallback) = this(scope, null, shutdownCallback)
def transform = {
- val callbackOption = if (callbacks eq null) None else Some(callbacks.transform)
- se.scalablesolutions.akka.config.ScalaConfig.LifeCycle(scope.transform, callbackOption)
+ val restartCallbacksOption = if (restartCallbacks eq null) None else Some(restartCallbacks.transform)
+ val shutdownCallbackOption = if (shutdownCallback eq null) None else Some(shutdownCallback.transform)
+ se.scalablesolutions.akka.config.ScalaConfig.LifeCycle(scope.transform, restartCallbacksOption, shutdownCallbackOption)
}
}
class RestartCallbacks(@BeanProperty val preRestart: String, @BeanProperty val postRestart: String) {
def transform = se.scalablesolutions.akka.config.ScalaConfig.RestartCallbacks(preRestart, postRestart)
}
+ class ShutdownCallback(@BeanProperty val shutdown: String) {
+ def transform = se.scalablesolutions.akka.config.ScalaConfig.ShutdownCallback(shutdown)
+ }
abstract class Scope extends ConfigElement {
def transform: se.scalablesolutions.akka.config.ScalaConfig.Scope
diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java
new file mode 100644
index 0000000000..50f3e43221
--- /dev/null
+++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java
@@ -0,0 +1,37 @@
+package se.scalablesolutions.akka.actor;
+
+import java.util.concurrent.CountDownLatch;
+
+public class SamplePojo {
+
+ private CountDownLatch latch;
+
+ public boolean _pre = false;
+ public boolean _post = false;
+ public boolean _down = false;
+
+ public CountDownLatch newCountdownLatch(int count) {
+ latch = new CountDownLatch(count);
+ return latch;
+ }
+
+ public String fail() {
+ throw new RuntimeException("expected");
+ }
+
+ public void pre() {
+ _pre = true;
+ latch.countDown();
+ }
+
+ public void post() {
+ _post = true;
+ latch.countDown();
+ }
+
+ public void down() {
+ _down = true;
+ latch.countDown();
+ }
+
+}
diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojoAnnotated.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojoAnnotated.java
new file mode 100644
index 0000000000..8bf4ba36d3
--- /dev/null
+++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojoAnnotated.java
@@ -0,0 +1,52 @@
+package se.scalablesolutions.akka.actor;
+
+import se.scalablesolutions.akka.actor.annotation.postrestart;
+import se.scalablesolutions.akka.actor.annotation.prerestart;
+import se.scalablesolutions.akka.actor.annotation.shutdown;
+
+import java.util.concurrent.CountDownLatch;
+
+public class SamplePojoAnnotated {
+
+ private CountDownLatch latch;
+
+ public boolean _pre = false;
+ public boolean _post = false;
+ public boolean _down = false;
+
+ public SamplePojoAnnotated() {
+ latch = new CountDownLatch(1);
+ }
+
+ public CountDownLatch newCountdownLatch(int count) {
+ latch = new CountDownLatch(count);
+ return latch;
+ }
+
+ public String greet(String s) {
+ return "hello " + s;
+ }
+
+ public String fail() {
+ throw new RuntimeException("expected");
+ }
+
+ @prerestart
+ public void pre() {
+ _pre = true;
+ latch.countDown();
+ }
+
+ @postrestart
+ public void post() {
+ _post = true;
+ latch.countDown();
+ }
+
+ @shutdown
+ public void down() {
+ _down = true;
+ latch.countDown();
+ }
+
+}
\ No newline at end of file
diff --git a/akka-core/src/test/scala/ActiveObjectLifecycleSpec.scala b/akka-core/src/test/scala/ActiveObjectLifecycleSpec.scala
new file mode 100644
index 0000000000..97b01c12ce
--- /dev/null
+++ b/akka-core/src/test/scala/ActiveObjectLifecycleSpec.scala
@@ -0,0 +1,155 @@
+package se.scalablesolutions.akka.actor
+
+import org.junit.runner.RunWith
+import org.scalatest.{BeforeAndAfterAll, Spec}
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.matchers.ShouldMatchers
+
+import se.scalablesolutions.akka.config.ActiveObjectConfigurator
+import se.scalablesolutions.akka.config.JavaConfig._
+
+/**
+ * @author Martin Krasser
+ */
+@RunWith(classOf[JUnitRunner])
+class ActiveObjectLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAfterAll {
+ var conf1: ActiveObjectConfigurator = _
+ var conf2: ActiveObjectConfigurator = _
+ var conf3: ActiveObjectConfigurator = _
+ var conf4: ActiveObjectConfigurator = _
+
+ override protected def beforeAll() = {
+ val strategy = new RestartStrategy(new AllForOne(), 3, 1000, Array(classOf[Exception]))
+ val comp1 = new Component(classOf[SamplePojoAnnotated], new LifeCycle(new Permanent()), 1000)
+ val comp2 = new Component(classOf[SamplePojoAnnotated], new LifeCycle(new Temporary()), 1000)
+ val comp3 = new Component(classOf[SamplePojo], new LifeCycle(new Permanent(), new RestartCallbacks("pre", "post")), 1000)
+ val comp4 = new Component(classOf[SamplePojo], new LifeCycle(new Temporary(), new ShutdownCallback("down")), 1000)
+ conf1 = new ActiveObjectConfigurator().configure(strategy, Array(comp1)).supervise
+ conf2 = new ActiveObjectConfigurator().configure(strategy, Array(comp2)).supervise
+ conf3 = new ActiveObjectConfigurator().configure(strategy, Array(comp3)).supervise
+ conf4 = new ActiveObjectConfigurator().configure(strategy, Array(comp4)).supervise
+ }
+
+ override protected def afterAll() = {
+ conf1.stop
+ conf2.stop
+ conf3.stop
+ conf4.stop
+ }
+
+ describe("ActiveObject lifecycle management") {
+ it("should restart supervised, annotated active object on failure") {
+ val obj = conf1.getInstance[SamplePojoAnnotated](classOf[SamplePojoAnnotated])
+ val cdl = obj.newCountdownLatch(2)
+ assert(AspectInitRegistry.initFor(obj) ne null)
+ try {
+ obj.fail
+ fail("expected exception not thrown")
+ } catch {
+ case e: RuntimeException => {
+ cdl.await
+ assert(obj._pre)
+ assert(obj._post)
+ assert(!obj._down)
+ assert(AspectInitRegistry.initFor(obj) ne null)
+ }
+ }
+ }
+
+ it("should shutdown supervised, annotated active object on failure") {
+ val obj = conf2.getInstance[SamplePojoAnnotated](classOf[SamplePojoAnnotated])
+ val cdl = obj.newCountdownLatch(1)
+ assert(AspectInitRegistry.initFor(obj) ne null)
+ try {
+ obj.fail
+ fail("expected exception not thrown")
+ } catch {
+ case e: RuntimeException => {
+ cdl.await
+ assert(!obj._pre)
+ assert(!obj._post)
+ assert(obj._down)
+ assert(AspectInitRegistry.initFor(obj) eq null)
+ }
+ }
+ }
+
+ it("should restart supervised, non-annotated active object on failure") {
+ val obj = conf3.getInstance[SamplePojo](classOf[SamplePojo])
+ val cdl = obj.newCountdownLatch(2)
+ assert(AspectInitRegistry.initFor(obj) ne null)
+ try {
+ obj.fail
+ fail("expected exception not thrown")
+ } catch {
+ case e: RuntimeException => {
+ cdl.await
+ assert(obj._pre)
+ assert(obj._post)
+ assert(!obj._down)
+ assert(AspectInitRegistry.initFor(obj) ne null)
+ }
+ }
+ }
+
+ it("should shutdown supervised, non-annotated active object on failure") {
+ val obj = conf4.getInstance[SamplePojo](classOf[SamplePojo])
+ val cdl = obj.newCountdownLatch(1)
+ assert(AspectInitRegistry.initFor(obj) ne null)
+ try {
+ obj.fail
+ fail("expected exception not thrown")
+ } catch {
+ case e: RuntimeException => {
+ cdl.await
+ assert(!obj._pre)
+ assert(!obj._post)
+ assert(obj._down)
+ assert(AspectInitRegistry.initFor(obj) eq null)
+ }
+ }
+ }
+
+ it("should shutdown non-supervised, annotated active object on ActiveObject.stop") {
+ val obj = ActiveObject.newInstance(classOf[SamplePojoAnnotated])
+ assert(AspectInitRegistry.initFor(obj) ne null)
+ assert("hello akka" === obj.greet("akka"))
+ ActiveObject.stop(obj)
+ assert(AspectInitRegistry.initFor(obj) eq null)
+ assert(!obj._pre)
+ assert(!obj._post)
+ assert(obj._down)
+ try {
+ obj.greet("akka")
+ fail("access to stopped active object")
+ } catch {
+ case e: Exception => { /* test passed */ }
+ }
+ }
+
+ it("should shutdown non-supervised, annotated active object on ActorRegistry.shutdownAll") {
+ val obj = ActiveObject.newInstance(classOf[SamplePojoAnnotated])
+ assert(AspectInitRegistry.initFor(obj) ne null)
+ assert("hello akka" === obj.greet("akka"))
+ ActorRegistry.shutdownAll
+ assert(AspectInitRegistry.initFor(obj) eq null)
+ assert(!obj._pre)
+ assert(!obj._post)
+ assert(obj._down)
+ try {
+ obj.greet("akka")
+ fail("access to stopped active object")
+ } catch {
+ case e: Exception => { /* test passed */ }
+ }
+ }
+
+ it("should shutdown non-supervised, non-initialized active object on ActiveObject.stop") {
+ val obj = ActiveObject.newInstance(classOf[SamplePojoAnnotated])
+ ActiveObject.stop(obj)
+ assert(!obj._pre)
+ assert(!obj._post)
+ assert(obj._down)
+ }
+ }
+}
\ No newline at end of file
diff --git a/akka-core/src/test/scala/ReceiveTimeoutSpec.scala b/akka-core/src/test/scala/ReceiveTimeoutSpec.scala
index 938cb08d43..5c50337894 100644
--- a/akka-core/src/test/scala/ReceiveTimeoutSpec.scala
+++ b/akka-core/src/test/scala/ReceiveTimeoutSpec.scala
@@ -14,7 +14,7 @@ class ReceiveTimeoutSpec extends JUnitSuite {
val timeoutLatch = new StandardLatch
val timeoutActor = actorOf(new Actor {
- self.receiveTimeout = 500
+ self.receiveTimeout = Some(500L)
protected def receive = {
case ReceiveTimeout => timeoutLatch.open
@@ -28,7 +28,7 @@ class ReceiveTimeoutSpec extends JUnitSuite {
val timeoutLatch = new StandardLatch
val timeoutActor = actorOf(new Actor {
- self.receiveTimeout = 500
+ self.receiveTimeout = Some(500L)
protected def receive = {
case ReceiveTimeout => timeoutLatch.open
@@ -51,7 +51,7 @@ class ReceiveTimeoutSpec extends JUnitSuite {
val timeoutLatch = new StandardLatch
case object Tick
val timeoutActor = actorOf(new Actor {
- self.receiveTimeout = 500
+ self.receiveTimeout = Some(500L)
protected def receive = {
case Tick => ()
@@ -60,6 +60,18 @@ class ReceiveTimeoutSpec extends JUnitSuite {
}).start
timeoutActor ! Tick
- assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS) == false)
+ assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == false)
+ }
+
+ @Test def timeoutShouldNotBeSentWhenNotSpecified = {
+ val timeoutLatch = new StandardLatch
+ val timeoutActor = actorOf(new Actor {
+
+ protected def receive = {
+ case ReceiveTimeout => timeoutLatch.open
+ }
+ }).start
+
+ assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS) == false)
}
}
diff --git a/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/dispatcher-config.xml b/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/dispatcher-config.xml
index 3339b07199..688d04f377 100644
--- a/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/dispatcher-config.xml
+++ b/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/dispatcher-config.xml
@@ -1,11 +1,13 @@
+ xmlns:akka="http://www.akkasource.org/schema/akka"
+ xmlns:beans="http://www.springframework.org/schema/lang"
+ xsi:schemaLocation="
+http://www.springframework.org/schema/beans
+http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+http://www.akkasource.org/schema/akka
+http://scalablesolutions.se/akka/akka-0.10.xsd">
+ xmlns:akka="http://www.akkasource.org/schema/akka"
+ xmlns:beans="http://www.springframework.org/schema/lang"
+ xsi:schemaLocation="
+http://www.springframework.org/schema/beans
+http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+http://www.akkasource.org/schema/akka
+http://scalablesolutions.se/akka/akka-0.10.xsd">
diff --git a/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/test-config.xml b/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/test-config.xml
index ef9dce1930..665d03a05e 100644
--- a/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/test-config.xml
+++ b/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/test-config.xml
@@ -1,11 +1,13 @@
+ xmlns:akka="http://www.akkasource.org/schema/akka"
+ xmlns:beans="http://www.springframework.org/schema/lang"
+ xsi:schemaLocation="
+http://www.springframework.org/schema/beans
+http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+http://www.akkasource.org/schema/akka
+http://scalablesolutions.se/akka/akka-0.10.xsd">
-
+
diff --git a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-0.10.xsd b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-0.10.xsd
index 9047b7c588..6eb0ec48fa 100644
--- a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-0.10.xsd
+++ b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-0.10.xsd
@@ -105,7 +105,7 @@
-
+
@@ -123,11 +123,23 @@
+
+
+
+
+
+ Shutdown callback method that is called during shut down.
+
+
+
+
+
+
diff --git a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka.xsd b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka.xsd
deleted file mode 100644
index 862cd06987..0000000000
--- a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka.xsd
+++ /dev/null
@@ -1,240 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- Name of the remote host.
-
-
-
-
-
-
- Port of the remote host.
-
-
-
-
-
-
-
-
-
-
- Pre restart callback method that is called during restart.
-
-
-
-
-
-
- Post restart callback method that is called during restart.
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- Name of the target class.
-
-
-
-
-
-
- default timeout for '!!' invocations
-
-
-
-
-
-
- Set to true if messages should have REQUIRES_NEW semantics
-
-
-
-
-
-
- Interface implemented by target class.
-
-
-
-
-
-
- Lifecycle, permanent or temporary
-
-
-
-
-
-
- Supported scopes are singleton and prototype
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- Failover scheme, AllForOne or OneForOne
-
-
-
-
-
-
- Maximal number of retries.
-
-
-
-
-
-
- Timerange for restart.
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala
index 4f6ea37148..6f62c5a8c4 100644
--- a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala
+++ b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala
@@ -1,22 +1,26 @@
/**
- * Copyright (C) 2009-2010 Scalable Solutions AB
+ * Copyright (C) 2009-2010 Scalable Solutions AB
*/
package se.scalablesolutions.akka.spring
import java.beans.PropertyDescriptor
-
import java.lang.reflect.Method
+import javax.annotation.PreDestroy
+import javax.annotation.PostConstruct
+import reflect.BeanProperty
+
import org.springframework.beans.BeanWrapperImpl
import org.springframework.beans.BeanWrapper
import org.springframework.beans.BeanUtils
-import org.springframework.util.ReflectionUtils
-import org.springframework.util.StringUtils
import org.springframework.beans.factory.BeanFactory
import org.springframework.beans.factory.config.AbstractFactoryBean
-import se.scalablesolutions.akka.actor.ActiveObject
-import reflect.BeanProperty
-import se.scalablesolutions.akka.config.ScalaConfig.RestartCallbacks
+import org.springframework.context.{ApplicationContext,ApplicationContextAware}
+import org.springframework.util.ReflectionUtils
+import org.springframework.util.StringUtils
+
+import se.scalablesolutions.akka.actor.{ActiveObjectConfiguration, ActiveObject}
+import se.scalablesolutions.akka.config.ScalaConfig.{ShutdownCallback, RestartCallbacks}
import se.scalablesolutions.akka.dispatch.MessageDispatcher
import se.scalablesolutions.akka.util.Logging
@@ -25,8 +29,9 @@ import se.scalablesolutions.akka.util.Logging
*
* @author michaelkober
* @author Johan Rask
+ * @author Martin Krasser
*/
-class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging {
+class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with ApplicationContextAware {
import StringReflect._
import AkkaSpringConfigurationTags._
@@ -36,12 +41,28 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging {
@BeanProperty var transactional: Boolean = false
@BeanProperty var pre: String = ""
@BeanProperty var post: String = ""
+ @BeanProperty var shutdown: String = ""
@BeanProperty var host: String = ""
@BeanProperty var port: Int = _
@BeanProperty var lifecycle: String = ""
@BeanProperty var dispatcher: DispatcherProperties = _
@BeanProperty var scope:String = VAL_SCOPE_SINGLETON
@BeanProperty var property:PropertyEntries = _
+ @BeanProperty var applicationContext:ApplicationContext = _
+
+ // Holds info about if deps has been set or not. Depends on
+ // if interface is specified or not. We must set deps on
+ // target instance if interface is specified
+ var hasSetDependecies = false
+
+
+ override def isSingleton:Boolean = {
+ if(scope.equals(VAL_SCOPE_SINGLETON)) {
+ true
+ } else {
+ false
+ }
+ }
/*
* @see org.springframework.beans.factory.FactoryBean#getObjectType()
@@ -57,28 +78,54 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging {
* @see org.springframework.beans.factory.config.AbstractFactoryBean#createInstance()
*/
def createInstance: AnyRef = {
- if(scope.equals(VAL_SCOPE_SINGLETON)) {
- setSingleton(true)
- } else {
- setSingleton(false)
- }
var argumentList = ""
if (isRemote) argumentList += "r"
if (hasInterface) argumentList += "i"
if (hasDispatcher) argumentList += "d"
- setProperties(
- create(argumentList))
-}
+ postConstruct(
+ setProperties(
+ create(argumentList)))
+
+ }
/**
- * This method manages element by injecting either
- * values () and bean references ()
+ * Stop the active object if it is a singleton.
*/
+ override def destroyInstance(instance:AnyRef) {
+ ActiveObject.stop(instance)
+ }
+
+ /**
+ * Invokes any method annotated with @PostConstruct
+ * When interfaces are specified, this method is invoked both on the
+ * target instance and on the active object, so a developer is free do decide
+ * where the annotation should be. If no interface is specified it is only invoked
+ * on the active object
+ */
+ private def postConstruct(ref:AnyRef) : AnyRef = {
+ // Invoke postConstruct method if any
+ for(method <- ref.getClass.getMethods) {
+ if(method.isAnnotationPresent(classOf[PostConstruct])) {
+ method.invoke(ref)
+ }
+ }
+ ref
+ }
+
+
private def setProperties(ref:AnyRef) : AnyRef = {
+ if(hasSetDependecies) {
+ return ref
+ }
+
log.debug("Processing properties and dependencies for target class %s",target)
val beanWrapper = new BeanWrapperImpl(ref);
- for(entry <- property.entryList) {
+ if(ref.isInstanceOf[ApplicationContextAware]) {
+ log.debug("Setting application context")
+ beanWrapper.setPropertyValue("applicationContext",applicationContext)
+ }
+ for(entry <- property.entryList) {
val propertyDescriptor = BeanUtils.getPropertyDescriptor(ref.getClass,entry.name)
val method = propertyDescriptor.getWriteMethod();
@@ -97,60 +144,50 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging {
ref
}
-// TODO: check if this works in 2.8 (type inferred to Nothing instead of AnyRef here)
-//
-// private[akka] def create(argList : String) : AnyRef = argList match {
-// case "r" => ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, host, port, callbacks)
-// case "ri" => ActiveObject.newRemoteInstance(interface.toClass, target.toClass, timeout, transactional, host, port, callbacks)
-// case "rd" => ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks)
-// case "rid" => ActiveObject.newRemoteInstance(interface.toClass, target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks)
-// case "i" => ActiveObject.newInstance(interface.toClass, target.toClass, timeout, transactional, callbacks)
-// case "id" => ActiveObject.newInstance(interface.toClass, target.toClass, timeout, transactional, dispatcherInstance, callbacks)
-// case "d" => ActiveObject.newInstance(target.toClass, timeout, transactional, dispatcherInstance, callbacks)
-// case _ => ActiveObject.newInstance(target.toClass, timeout, transactional, callbacks)
-// }
-
private[akka] def create(argList : String) : AnyRef = {
if (argList == "r") {
- ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, host, port, callbacks)
+ ActiveObject.newInstance(target.toClass, createConfig.makeRemote(host, port))
} else if (argList == "ri" ) {
- ActiveObject.newRemoteInstance(interface.toClass, aNewInstance(target.toClass), timeout, transactional, host, port, callbacks)
+ ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), createConfig.makeRemote(host, port))
} else if (argList == "rd") {
- ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks)
+ ActiveObject.newInstance(target.toClass, createConfig.makeRemote(host, port).dispatcher(dispatcherInstance))
} else if (argList == "rid") {
- ActiveObject.newRemoteInstance(interface.toClass, aNewInstance(target.toClass), timeout, transactional, dispatcherInstance, host, port, callbacks)
+ ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), createConfig.makeRemote(host, port).dispatcher(dispatcherInstance))
} else if (argList == "i") {
- ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), timeout, transactional, callbacks)
+ ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), createConfig)
} else if (argList == "id") {
- ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), timeout, transactional, dispatcherInstance, callbacks)
+ ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), createConfig.dispatcher(dispatcherInstance))
} else if (argList == "d") {
- ActiveObject.newInstance(target.toClass, timeout, transactional, dispatcherInstance, callbacks)
+ ActiveObject.newInstance(target.toClass, createConfig.dispatcher(dispatcherInstance))
} else {
- ActiveObject.newInstance(target.toClass, timeout, transactional, callbacks)
+ ActiveObject.newInstance(target.toClass, createConfig)
}
}
- def aNewInstance[T <: AnyRef](clazz: Class[T]) : T = {
- clazz.newInstance().asInstanceOf[T]
- }
- /**
- * create Option[RestartCallback]
- */
- private def callbacks: Option[RestartCallbacks] = {
- if (hasCallbacks) {
- val callbacks = new RestartCallbacks(pre, post)
- Some(callbacks)
- } else {
- None
- }
+
+ private[akka] def createConfig: ActiveObjectConfiguration = {
+ val config = new ActiveObjectConfiguration().timeout(timeout)
+ if (hasRestartCallbacks) config.restartCallbacks(pre, post)
+ if (hasShutdownCallback) config.shutdownCallback(shutdown)
+ if (transactional) config.makeTransactionRequired
+ config
+ }
+ def aNewInstance[T <: AnyRef](clazz: Class[T]) : T = {
+ var ref = clazz.newInstance().asInstanceOf[T]
+ postConstruct(
+ setProperties(ref))
+ hasSetDependecies = true
+ ref
}
private[akka] def isRemote = (host != null) && (!host.isEmpty)
private[akka] def hasInterface = (interface != null) && (!interface.isEmpty)
- private[akka] def hasCallbacks = ((pre != null) && !pre.isEmpty) || ((post != null) && !post.isEmpty)
+ private[akka] def hasRestartCallbacks = ((pre != null) && !pre.isEmpty) || ((post != null) && !post.isEmpty)
+
+ private[akka] def hasShutdownCallback = ((shutdown != null) && !shutdown.isEmpty)
private[akka] def hasDispatcher = (dispatcher != null) && (dispatcher.dispatcherType != null) && (!dispatcher.dispatcherType.isEmpty)
diff --git a/akka-spring/src/main/scala/ActiveObjectParser.scala b/akka-spring/src/main/scala/ActiveObjectParser.scala
index fc6b372720..8838360a44 100644
--- a/akka-spring/src/main/scala/ActiveObjectParser.scala
+++ b/akka-spring/src/main/scala/ActiveObjectParser.scala
@@ -13,6 +13,7 @@ import se.scalablesolutions.akka.actor.IllegalActorStateException
* Parser trait for custom namespace configuration for active-object.
* @author michaelkober
* @author Johan Rask
+ * @author Martin Krasser
*/
trait ActiveObjectParser extends BeanParser with DispatcherParser {
import AkkaSpringConfigurationTags._
@@ -25,7 +26,8 @@ trait ActiveObjectParser extends BeanParser with DispatcherParser {
def parseActiveObject(element: Element): ActiveObjectProperties = {
val objectProperties = new ActiveObjectProperties()
val remoteElement = DomUtils.getChildElementByTagName(element, REMOTE_TAG);
- val callbacksElement = DomUtils.getChildElementByTagName(element, RESTART_CALLBACKS_TAG);
+ val restartCallbacksElement = DomUtils.getChildElementByTagName(element, RESTART_CALLBACKS_TAG);
+ val shutdownCallbackElement = DomUtils.getChildElementByTagName(element, SHUTDOWN_CALLBACK_TAG);
val dispatcherElement = DomUtils.getChildElementByTagName(element, DISPATCHER_TAG)
val propertyEntries = DomUtils.getChildElementsByTagName(element,PROPERTYENTRY_TAG)
@@ -34,14 +36,18 @@ trait ActiveObjectParser extends BeanParser with DispatcherParser {
objectProperties.port = mandatory(remoteElement, PORT).toInt
}
- if (callbacksElement != null) {
- objectProperties.preRestart = callbacksElement.getAttribute(PRE_RESTART)
- objectProperties.postRestart = callbacksElement.getAttribute(POST_RESTART)
+ if (restartCallbacksElement != null) {
+ objectProperties.preRestart = restartCallbacksElement.getAttribute(PRE_RESTART)
+ objectProperties.postRestart = restartCallbacksElement.getAttribute(POST_RESTART)
if ((objectProperties.preRestart.isEmpty) && (objectProperties.preRestart.isEmpty)) {
throw new IllegalActorStateException("At least one of pre or post must be defined.")
}
}
+ if (shutdownCallbackElement != null) {
+ objectProperties.shutdown = shutdownCallbackElement.getAttribute("method")
+ }
+
if (dispatcherElement != null) {
val dispatcherProperties = parseDispatcher(dispatcherElement)
objectProperties.dispatcher = dispatcherProperties
diff --git a/akka-spring/src/main/scala/ActiveObjectProperties.scala b/akka-spring/src/main/scala/ActiveObjectProperties.scala
index ba4828e2f9..0f4b09d559 100644
--- a/akka-spring/src/main/scala/ActiveObjectProperties.scala
+++ b/akka-spring/src/main/scala/ActiveObjectProperties.scala
@@ -10,6 +10,7 @@ import AkkaSpringConfigurationTags._
/**
* Data container for active object configuration data.
* @author michaelkober
+ * @author Martin Krasser
*/
class ActiveObjectProperties {
var target: String = ""
@@ -18,10 +19,11 @@ class ActiveObjectProperties {
var transactional: Boolean = false
var preRestart: String = ""
var postRestart: String = ""
+ var shutdown: String = ""
var host: String = ""
var port: Int = _
var lifecycle: String = ""
- var scope:String = ""
+ var scope:String = VAL_SCOPE_SINGLETON
var dispatcher: DispatcherProperties = _
var propertyEntries = new PropertyEntries()
@@ -35,6 +37,7 @@ class ActiveObjectProperties {
builder.addPropertyValue(PORT, port)
builder.addPropertyValue(PRE_RESTART, preRestart)
builder.addPropertyValue(POST_RESTART, postRestart)
+ builder.addPropertyValue(SHUTDOWN, shutdown)
builder.addPropertyValue(TIMEOUT, timeout)
builder.addPropertyValue(TARGET, target)
builder.addPropertyValue(INTERFACE, interface)
diff --git a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala
index 5e927ceba1..80a9f2e8d0 100644
--- a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala
+++ b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala
@@ -6,6 +6,7 @@ package se.scalablesolutions.akka.spring
/**
* XML configuration tags.
* @author michaelkober
+ * @author Martin Krasser
*/
object AkkaSpringConfigurationTags {
@@ -20,6 +21,7 @@ object AkkaSpringConfigurationTags {
// active-object sub tags
val RESTART_CALLBACKS_TAG = "restart-callbacks"
+ val SHUTDOWN_CALLBACK_TAG = "shutdown-callback"
val REMOTE_TAG = "remote"
// superivision sub tags
@@ -45,6 +47,7 @@ object AkkaSpringConfigurationTags {
val PORT = "port"
val PRE_RESTART = "pre"
val POST_RESTART = "post"
+ val SHUTDOWN = "shutdown"
val LIFECYCLE = "lifecycle"
val SCOPE = "scope"
diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/Pojo.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/Pojo.java
new file mode 100644
index 0000000000..04995b75c8
--- /dev/null
+++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/Pojo.java
@@ -0,0 +1,39 @@
+package se.scalablesolutions.akka.spring;
+
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import javax.annotation.PreDestroy;
+import javax.annotation.PostConstruct;
+
+public class Pojo implements PojoInf,ApplicationContextAware {
+
+ private String string;
+
+ private boolean gotApplicationContext = false;
+ private boolean postConstructInvoked = false;
+
+ public boolean gotApplicationContext() {
+ return gotApplicationContext;
+ }
+ public void setApplicationContext(ApplicationContext context) {
+ gotApplicationContext = true;
+ }
+
+ public void setString(String s) {
+ string = s;
+ }
+
+ public String getString() {
+ return string;
+ }
+
+ @PostConstruct
+ public void create() {
+ postConstructInvoked = true;
+ }
+
+ public boolean isPostConstructInvoked() {
+ return postConstructInvoked;
+}
+
+ }
diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/PojoInf.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/PojoInf.java
new file mode 100644
index 0000000000..70d64245db
--- /dev/null
+++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/PojoInf.java
@@ -0,0 +1,14 @@
+package se.scalablesolutions.akka.spring;
+
+import javax.annotation.PreDestroy;
+import javax.annotation.PostConstruct;
+
+public interface PojoInf {
+
+ public String getString();
+ public boolean gotApplicationContext();
+ public boolean isPostConstructInvoked();
+
+ @PostConstruct
+ public void create();
+ }
diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java
index 37953173ec..e8adaa38e7 100644
--- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java
+++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java
@@ -1,9 +1,22 @@
package se.scalablesolutions.akka.spring;
+import se.scalablesolutions.akka.actor.annotation.shutdown;
+
public class SampleBean {
+ public boolean down;
+
+ public SampleBean() {
+ down = false;
+ }
+
public String foo(String s) {
return "hello " + s;
}
-
+
+ @shutdown
+ public void shutdown() {
+ down = true;
+ }
+
}
diff --git a/akka-spring/src/test/resources/appContext.xml b/akka-spring/src/test/resources/appContext.xml
index 5648fa2fdf..e9a651b735 100644
--- a/akka-spring/src/test/resources/appContext.xml
+++ b/akka-spring/src/test/resources/appContext.xml
@@ -6,16 +6,28 @@
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.akkasource.org/schema/akka
http://scalablesolutions.se/akka/akka-0.10.xsd">
-
-
-
-
-
-
-
-
-
\ No newline at end of file
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/akka-spring/src/test/scala/ActiveObjectBeanDefinitionParserTest.scala b/akka-spring/src/test/scala/ActiveObjectBeanDefinitionParserTest.scala
index d8bb29ed3e..dc48ecc4b1 100644
--- a/akka-spring/src/test/scala/ActiveObjectBeanDefinitionParserTest.scala
+++ b/akka-spring/src/test/scala/ActiveObjectBeanDefinitionParserTest.scala
@@ -54,8 +54,8 @@ class ActiveObjectBeanDefinitionParserTest extends Spec with ShouldMatchers {
val props = parser.parseActiveObject(dom(xml).getDocumentElement);
assert(props != null)
- assert(props.dispatcher.dispatcherType == "thread-based")
-}
+ assert(props.dispatcher.dispatcherType === "thread-based")
+ }
it("should parse remote ActiveObjects configuration") {
val xml =
val props = parser.parseActiveObject(dom(xml).getDocumentElement);
assert(props != null)
- assert(props.host == "com.some.host")
- assert(props.port == 9999)
+ assert(props.host === "com.some.host")
+ assert(props.port === 9999)
}
}
}
diff --git a/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala b/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala
index f1c11d0eee..68dac8e97c 100644
--- a/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala
+++ b/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala
@@ -65,10 +65,33 @@ class ActiveObjectFactoryBeanTest extends Spec with ShouldMatchers {
assert(target.getSource === entry.value)
}
- it("should create an application context and inject a string dependency") {
+ it("should create an application context and verify dependency injection") {
var ctx = new ClassPathXmlApplicationContext("appContext.xml");
val target:ResourceEditor = ctx.getBean("bean").asInstanceOf[ResourceEditor]
assert(target.getSource === "someString")
+
+ val pojoInf = ctx.getBean("pojoInf").asInstanceOf[PojoInf];
+ println("pojoInf = " + pojoInf.getString)
+ Thread.sleep(200)
+ assert(pojoInf.isPostConstructInvoked)
+ assert(pojoInf.getString == "akka rocks")
+ assert(pojoInf.gotApplicationContext)
+ }
+
+ it("should stop the created active object when scope is singleton and the context is closed") {
+ var ctx = new ClassPathXmlApplicationContext("appContext.xml");
+ val target = ctx.getBean("bean-singleton").asInstanceOf[SampleBean]
+ assert(!target.down)
+ ctx.close
+ assert(target.down)
+ }
+
+ it("should not stop the created active object when scope is prototype and the context is closed") {
+ var ctx = new ClassPathXmlApplicationContext("appContext.xml");
+ val target = ctx.getBean("bean-prototype").asInstanceOf[SampleBean]
+ assert(!target.down)
+ ctx.close
+ assert(!target.down)
}
}
}
diff --git a/akka-spring/src/test/scala/SupervisionBeanDefinitionParserTest.scala b/akka-spring/src/test/scala/SupervisionBeanDefinitionParserTest.scala
index e0344ae4fb..ffc1f7a95d 100644
--- a/akka-spring/src/test/scala/SupervisionBeanDefinitionParserTest.scala
+++ b/akka-spring/src/test/scala/SupervisionBeanDefinitionParserTest.scala
@@ -49,11 +49,21 @@ class SupervisionBeanDefinitionParserTest extends Spec with ShouldMatchers {
parser.parseSupervisor(createSupervisorElement, builder);
val supervised = builder.getBeanDefinition.getPropertyValues.getPropertyValue("supervised").getValue.asInstanceOf[List[ActiveObjectProperties]]
assert(supervised != null)
- expect(3) { supervised.length }
+ expect(4) { supervised.length }
val iterator = supervised.iterator
- expect("foo.bar.Foo") { iterator.next.target }
- expect("foo.bar.Bar") { iterator.next.target }
- expect("foo.bar.MyPojo") { iterator.next.target }
+ val prop1 = iterator.next
+ val prop2 = iterator.next
+ val prop3 = iterator.next
+ val prop4 = iterator.next
+ expect("foo.bar.Foo") { prop1.target }
+ expect("foo.bar.Bar") { prop2.target }
+ expect("foo.bar.MyPojo") { prop3.target }
+ expect("foo.bar.MyPojo") { prop4.target }
+ expect("preRestart") { prop3.preRestart }
+ expect("postRestart") { prop3.postRestart }
+ expect("shutdown") { prop4.shutdown }
+ expect("permanent") { prop1.lifecycle }
+ expect("temporary") { prop4.lifecycle }
}
it("should throw IllegalArgumentException on missing mandatory attributes") {
@@ -87,6 +97,9 @@ class SupervisionBeanDefinitionParserTest extends Spec with ShouldMatchers {
+
+
+
dom(xml).getDocumentElement