Merge branch 'master' of github.com:jboner/akka into wip_141_SSL_enable_remote_actors

This commit is contained in:
Viktor Klang 2010-07-07 22:24:47 +02:00
commit 5220bbc6c5
32 changed files with 815 additions and 426 deletions

View file

@ -47,6 +47,18 @@ private[camel] object ConsumerPublisher extends Logging {
CamelContextManager.context.addRoutes(new ConsumerMethodRoute(event.uri, objectId, targetMethod))
log.info("published method %s of %s at endpoint %s" format (targetMethod, event.activeObject, event.uri))
}
/**
* Stops route to the already un-registered consumer actor method.
*/
def handleConsumerMethodUnregistered(event: ConsumerMethodUnregistered) {
val targetMethod = event.method.getName
val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod)
CamelContextManager.activeObjectRegistry.remove(objectId)
CamelContextManager.context.stopRoute(objectId)
log.info("unpublished method %s of %s from endpoint %s" format (targetMethod, event.activeObject, event.uri))
}
}
/**
@ -76,8 +88,12 @@ private[camel] class ConsumerPublisher extends Actor {
handleConsumerUnregistered(u)
latch.countDown // needed for testing only.
}
case d: ConsumerMethodRegistered => {
handleConsumerMethodRegistered(d)
case mr: ConsumerMethodRegistered => {
handleConsumerMethodRegistered(mr)
latch.countDown // needed for testing only.
}
case mu: ConsumerMethodUnregistered => {
handleConsumerMethodUnregistered(mu)
latch.countDown // needed for testing only.
}
case SetExpectedMessageCount(num) => {
@ -94,7 +110,6 @@ private[camel] class ConsumerPublisher extends Actor {
*/
private[camel] case class SetExpectedMessageCount(num: Int)
/**
* Defines an abstract route to a target which is either an actor or an active object method..
*
@ -171,6 +186,8 @@ private[camel] class PublishRequestor extends Actor {
for (event <- ConsumerUnregistered.forConsumer(actor)) deliverCurrentEvent(event)
case AspectInitRegistered(proxy, init) =>
for (event <- ConsumerMethodRegistered.forConsumer(proxy, init)) deliverCurrentEvent(event)
case AspectInitUnregistered(proxy, init) =>
for (event <- ConsumerMethodUnregistered.forConsumer(proxy, init)) deliverCurrentEvent(event)
case PublishRequestorInit(pub) => {
publisher = Some(pub)
deliverBufferedEvents
@ -244,6 +261,20 @@ private[camel] case class ConsumerUnregistered(actorRef: ActorRef, uri: String,
*/
private[camel] case class ConsumerMethodRegistered(activeObject: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent
/**
* Event indicating that an active object has been stopped. For each
* <code>@consume</code> annotated POJO method a separate instance of this class is
* created.
*
* @param activeObject active object (proxy).
* @param init
* @param uri endpoint URI of the active object method
* @param method method to be un-published.
*
* @author Martin Krasser
*/
private[camel] case class ConsumerMethodUnregistered(activeObject: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent
/**
* @author Martin Krasser
*/
@ -272,6 +303,26 @@ private[camel] object ConsumerUnregistered {
}
}
/**
* @author Martin Krasser
*/
private[camel] object ConsumerMethod {
/**
* Applies a function <code>f</code> to each consumer method of <code>activeObject</code> and
* returns the function results as a list. A consumer method is one that is annotated with
* <code>@consume</code>. If <code>activeObject</code> is a proxy for a remote active object
* <code>f</code> is never called and <code>Nil</code> is returned.
*/
def forConsumer[T](activeObject: AnyRef, init: AspectInit)(f: Method => T): List[T] = {
// TODO: support consumer annotation inheritance
// - visit overridden methods in superclasses
// - visit implemented method declarations in interfaces
if (init.remoteAddress.isDefined) Nil // let remote node publish active object methods on endpoints
else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume])))
yield f(m)
}
}
/**
* @author Martin Krasser
*/
@ -282,12 +333,22 @@ private[camel] object ConsumerMethodRegistered {
* have any <code>@consume</code> annotated methods.
*/
def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodRegistered] = {
// TODO: support consumer annotation inheritance
// - visit overridden methods in superclasses
// - visit implemented method declarations in interfaces
if (init.remoteAddress.isDefined) Nil // let remote node publish active object methods on endpoints
else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume])))
yield ConsumerMethodRegistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
ConsumerMethod.forConsumer[ConsumerMethodRegistered](activeObject, init) {
m => ConsumerMethodRegistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
}
}
}
private[camel] object ConsumerMethodUnregistered {
/**
* Creates a list of ConsumerMethodUnregistered event messages for an active object or an empty
* list if the active object is a proxy for an remote active object or the active object doesn't
* have any <code>@consume</code> annotated methods.
*/
def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodUnregistered] = {
ConsumerMethod.forConsumer[ConsumerMethodUnregistered](activeObject, init) {
m => ConsumerMethodUnregistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
}
}
}

View file

@ -59,7 +59,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
feature("Unpublish registered consumer actor from the global CamelContext") {
scenario("attempt access to unregistered consumer actor via Camel direct-endpoint") {
scenario("access to unregistered consumer actor via Camel direct-endpoint fails") {
val endpointUri = "direct:unpublish-test-1"
given("a consumer actor that has been stopped")
@ -78,7 +78,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
when("a request is sent to this actor")
val response1 = CamelContextManager.template.requestBody(endpointUri, "msg1")
then("the direct endpoint falls back to its default behaviour and returns the original message")
then("the direct-endpoint falls back to its default behaviour and returns the original message")
assert(response1 === "msg1")
}
}
@ -103,8 +103,8 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
scenario("access active object methods via Camel direct-endpoints") {
given("an active object registered after CamelService startup")
val latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
ActiveObject.newInstance(classOf[PojoBase])
var latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
val obj = ActiveObject.newInstance(classOf[PojoBase])
assert(latch.await(5000, TimeUnit.MILLISECONDS))
when("requests are sent to published methods")
@ -116,6 +116,36 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
assert(response1 === "m2base: x y")
assert(response2 === "m3base: x y")
assert(response3 === "m4base: x y")
// cleanup to avoid conflicts with next test (i.e. avoid multiple consumers on direct-endpoints)
latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
ActiveObject.stop(obj)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
}
}
feature("Unpublish active object method from the global CamelContext") {
scenario("access to unregistered active object methof via Camel direct-endpoint fails") {
given("an active object that has been stopped")
var latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
val obj = ActiveObject.newInstance(classOf[PojoBase])
assert(latch.await(5000, TimeUnit.MILLISECONDS))
latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
ActiveObject.stop(obj)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
when("requests are sent to published methods")
val response1 = CamelContextManager.template.requestBodyAndHeader("direct:m2base", "x", "test", "y")
val response2 = CamelContextManager.template.requestBodyAndHeader("direct:m3base", "x", "test", "y")
val response3 = CamelContextManager.template.requestBodyAndHeader("direct:m4base", "x", "test", "y")
then("the direct-endpoints fall back to their default behaviour and return the original message")
assert(response1 === "x")
assert(response2 === "x")
assert(response3 === "x")
}
}
}

View file

@ -2,21 +2,19 @@ package se.scalablesolutions.akka.camel
import java.net.InetSocketAddress
import org.junit.Test
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.{AspectInit, ActiveObject}
import se.scalablesolutions.akka.camel.ConsumerMethodRegistered._
import org.junit.{AfterClass, Test}
class ConsumerMethodRegisteredTest extends JUnitSuite {
import ConsumerMethodRegisteredTest._
val remoteAddress = new InetSocketAddress("localhost", 8888);
val remoteAspectInit = AspectInit(classOf[String], null, Some(remoteAddress), 1000)
val localAspectInit = AspectInit(classOf[String], null, None, 1000)
val activePojoBase = ActiveObject.newInstance(classOf[PojoBase])
val activePojoSub = ActiveObject.newInstance(classOf[PojoSub])
val activePojoIntf = ActiveObject.newInstance(classOf[PojoIntf], new PojoImpl)
val ascendingMethodName = (r1: ConsumerMethodRegistered, r2: ConsumerMethodRegistered) =>
r1.method.getName < r2.method.getName
@ -44,3 +42,16 @@ class ConsumerMethodRegisteredTest extends JUnitSuite {
}
}
object ConsumerMethodRegisteredTest {
val activePojoBase = ActiveObject.newInstance(classOf[PojoBase])
val activePojoSub = ActiveObject.newInstance(classOf[PojoSub])
val activePojoIntf = ActiveObject.newInstance(classOf[PojoIntf], new PojoImpl)
@AfterClass
def afterClass = {
ActiveObject.stop(activePojoBase)
ActiveObject.stop(activePojoSub)
ActiveObject.stop(activePojoIntf)
}
}

View file

@ -44,6 +44,19 @@ class PublishRequestorTest extends JUnitSuite {
assert(event.method.getName === "foo")
}
@Test def shouldReceiveConsumerMethodUnregisteredEvent = {
val obj = ActiveObject.newInstance(classOf[PojoSingle])
val init = AspectInit(classOf[PojoSingle], null, None, 1000)
val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get
requestor ! AspectInitUnregistered(obj, init)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val event = (publisher !! GetRetainedMessage).get.asInstanceOf[ConsumerMethodUnregistered]
assert(event.init === init)
assert(event.uri === "direct:foo")
assert(event.activeObject === obj)
assert(event.method.getName === "foo")
}
@Test def shouldReceiveConsumerRegisteredEvent = {
val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get
requestor ! ActorRegistered(consumer)

View file

@ -55,14 +55,13 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh
}
}
/* TODO: enable once issues with remote active objects are resolved
feature("Client-initiated remote consumer active object") {
scenario("access published remote consumer method") {
given("a client-initiated remote consumer active object")
val consumer = ActiveObject.newRemoteInstance(classOf[PojoRemote], host, port)
when("remote consumer publication is triggered")
val latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(1)).get
val latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get
consumer.foo("init")
assert(latch.await(5000, TimeUnit.MILLISECONDS))
@ -71,7 +70,6 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh
assert(response === "remote active object: test")
}
}
*/
}
object RemoteConsumerTest {

View file

@ -0,0 +1,14 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface shutdown {}

View file

@ -25,6 +25,7 @@ object Annotations {
val transactionrequired = classOf[transactionrequired]
val prerestart = classOf[prerestart]
val postrestart = classOf[postrestart]
val shutdown = classOf[shutdown]
val inittransactionalstate = classOf[inittransactionalstate]
}
@ -36,6 +37,7 @@ object Annotations {
final class ActiveObjectConfiguration {
private[akka] var _timeout: Long = Actor.TIMEOUT
private[akka] var _restartCallbacks: Option[RestartCallbacks] = None
private[akka] var _shutdownCallback: Option[ShutdownCallback] = None
private[akka] var _transactionRequired = false
private[akka] var _host: Option[InetSocketAddress] = None
private[akka] var _messageDispatcher: Option[MessageDispatcher] = None
@ -50,6 +52,11 @@ final class ActiveObjectConfiguration {
this
}
def shutdownCallback(down: String) : ActiveObjectConfiguration = {
_shutdownCallback = Some(new ShutdownCallback(down))
this
}
def makeTransactionRequired() : ActiveObjectConfiguration = {
_transactionRequired = true;
this
@ -152,25 +159,25 @@ object ActiveObject extends Logging {
private[actor] val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
def newInstance[T](target: Class[T], timeout: Long): T =
newInstance(target, actorOf(new Dispatcher(false, None)), None, timeout)
newInstance(target, actorOf(new Dispatcher(false)), None, timeout)
def newInstance[T](target: Class[T]): T =
newInstance(target, actorOf(new Dispatcher(false, None)), None, Actor.TIMEOUT)
newInstance(target, actorOf(new Dispatcher(false)), None, Actor.TIMEOUT)
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T =
newInstance(intf, target, actorOf(new Dispatcher(false, None)), None, timeout)
newInstance(intf, target, actorOf(new Dispatcher(false)), None, timeout)
def newInstance[T](intf: Class[T], target: AnyRef): T =
newInstance(intf, target, actorOf(new Dispatcher(false, None)), None, Actor.TIMEOUT)
newInstance(intf, target, actorOf(new Dispatcher(false)), None, Actor.TIMEOUT)
def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T =
newInstance(target, actorOf(new Dispatcher(false, None)), Some(new InetSocketAddress(hostname, port)), timeout)
newInstance(target, actorOf(new Dispatcher(false)), Some(new InetSocketAddress(hostname, port)), timeout)
def newRemoteInstance[T](target: Class[T], hostname: String, port: Int): T =
newInstance(target, actorOf(new Dispatcher(false, None)), Some(new InetSocketAddress(hostname, port)), Actor.TIMEOUT)
newInstance(target, actorOf(new Dispatcher(false)), Some(new InetSocketAddress(hostname, port)), Actor.TIMEOUT)
def newInstance[T](target: Class[T], config: ActiveObjectConfiguration): T = {
val actor = actorOf(new Dispatcher(config._transactionRequired, config._restartCallbacks))
val actor = actorOf(new Dispatcher(config._transactionRequired, config._restartCallbacks, config._shutdownCallback))
if (config._messageDispatcher.isDefined) {
actor.dispatcher = config._messageDispatcher.get
}
@ -178,7 +185,7 @@ object ActiveObject extends Logging {
}
def newInstance[T](intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration): T = {
val actor = actorOf(new Dispatcher(config._transactionRequired, config._restartCallbacks))
val actor = actorOf(new Dispatcher(config._transactionRequired, config._restartCallbacks, config._shutdownCallback))
if (config._messageDispatcher.isDefined) {
actor.dispatcher = config._messageDispatcher.get
}
@ -358,7 +365,6 @@ object ActiveObject extends Logging {
val proxy = Proxy.newInstance(target, true, false)
val context = injectActiveObjectContext(proxy)
actorRef.actor.asInstanceOf[Dispatcher].initialize(target, proxy, context)
ActorRegistry.unregister(actorRef) // do not store the dispatcher in the ActorRegistry since it will prevent GC
actorRef.timeout = timeout
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
AspectInitRegistry.register(proxy, AspectInit(target, actorRef, remoteAddress, timeout))
@ -371,7 +377,6 @@ object ActiveObject extends Logging {
val context = injectActiveObjectContext(target)
val proxy = Proxy.newInstance(Array(intf), Array(target), true, false)
actorRef.actor.asInstanceOf[Dispatcher].initialize(target.getClass, target, context)
ActorRegistry.unregister(actorRef) // do not store the dispatcher in the ActorRegistry since it will prevent GC
actorRef.timeout = timeout
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
AspectInitRegistry.register(proxy, AspectInit(intf, actorRef, remoteAddress, timeout))
@ -379,6 +384,11 @@ object ActiveObject extends Logging {
proxy.asInstanceOf[T]
}
def stop(obj: AnyRef): Unit = {
val init = AspectInitRegistry.initFor(obj)
init.actorRef.stop
}
/**
* Get the underlying dispatcher actor for the given active object.
*/
@ -483,9 +493,7 @@ private[akka] object AspectInitRegistry extends ListenerManagement {
private val initializations = new java.util.concurrent.ConcurrentHashMap[AnyRef, AspectInit]
def initFor(target: AnyRef) = {
val init = initializations.get(target)
initializations.remove(target)
init
initializations.get(target)
}
def register(target: AnyRef, init: AspectInit) = {
@ -493,10 +501,17 @@ private[akka] object AspectInitRegistry extends ListenerManagement {
foreachListener(_ ! AspectInitRegistered(target, init))
res
}
def unregister(target: AnyRef) = {
val res = initializations.remove(target)
foreachListener(_ ! AspectInitUnregistered(target, res))
res
}
}
private[akka] sealed trait AspectInitRegistryEvent
private[akka] case class AspectInitRegistered(proxy: AnyRef, init: AspectInit) extends AspectInitRegistryEvent
private[akka] case class AspectInitUnregistered(proxy: AnyRef, init: AspectInit) extends AspectInitRegistryEvent
private[akka] sealed case class AspectInit(
val target: Class[_],
@ -506,8 +521,6 @@ private[akka] sealed case class AspectInit(
def this(target: Class[_], actorRef: ActorRef, timeout: Long) = this(target, actorRef, None, timeout)
}
// FIXME: add @shutdown callback to ActiveObject in which we get the Aspect through 'Aspects.aspectOf(MyAspect.class, targetInstance)' and shuts down the Dispatcher actor
/**
* AspectWerkz Aspect that is turning POJOs into Active Object.
* Is deployed on a 'per-instance' basis.
@ -517,6 +530,7 @@ private[akka] sealed case class AspectInit(
@Aspect("perInstance")
private[akka] sealed class ActiveObjectAspect {
@volatile private var isInitialized = false
@volatile private var isStopped = false
private var target: Class[_] = _
private var actorRef: ActorRef = _
private var remoteAddress: Option[InetSocketAddress] = _
@ -547,7 +561,11 @@ private[akka] sealed class ActiveObjectAspect {
val isOneWay = isVoid(rtti)
val sender = ActiveObjectContext.sender.value
val senderFuture = ActiveObjectContext.senderFuture.value
if (isOneWay) {
if (!actorRef.isRunning && !isStopped) {
isStopped = true
joinPoint.proceed
} else if (isOneWay) {
actorRef ! Invocation(joinPoint, true, true, sender, senderFuture)
null.asInstanceOf[AnyRef]
} else {
@ -656,10 +674,13 @@ object Dispatcher {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Option[RestartCallbacks]) extends Actor {
private[akka] class Dispatcher(transactionalRequired: Boolean,
var restartCallbacks: Option[RestartCallbacks] = None,
var shutdownCallback: Option[ShutdownCallback] = None) extends Actor {
import Dispatcher._
private[actor] var target: Option[AnyRef] = None
private var zhutdown: Option[Method] = None
private var preRestart: Option[Method] = None
private var postRestart: Option[Method] = None
private var initTxState: Option[Method] = None
@ -681,7 +702,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op
val methods = targetInstance.getClass.getDeclaredMethods.toList
// See if we have any config define restart callbacks
callbacks match {
restartCallbacks match {
case None => {}
case Some(RestartCallbacks(pre, post)) =>
preRestart = Some(try {
@ -695,10 +716,22 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op
"Could not find post restart method [" + post + "] \nin [" +
targetClass.getName + "]. \nIt must have a zero argument definition.") })
}
// See if we have any config define a shutdown callback
shutdownCallback match {
case None => {}
case Some(ShutdownCallback(down)) =>
zhutdown = Some(try {
targetInstance.getClass.getDeclaredMethod(down, ZERO_ITEM_CLASS_ARRAY: _*)
} catch { case e => throw new IllegalStateException(
"Could not find shutdown method [" + down + "] \nin [" +
targetClass.getName + "]. \nIt must have a zero argument definition.") })
}
// See if we have any annotation defined restart callbacks
if (!preRestart.isDefined) preRestart = methods.find(m => m.isAnnotationPresent(Annotations.prerestart))
if (!postRestart.isDefined) postRestart = methods.find(m => m.isAnnotationPresent(Annotations.postrestart))
// See if we have an annotation defined shutdown callback
if (!zhutdown.isDefined) zhutdown = methods.find(m => m.isAnnotationPresent(Annotations.shutdown))
if (preRestart.isDefined && preRestart.get.getParameterTypes.length != 0)
throw new IllegalActorStateException(
@ -708,9 +741,14 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op
throw new IllegalActorStateException(
"Method annotated with @postrestart or defined as a restart callback in \n[" +
targetClass.getName + "] must have a zero argument definition")
if (zhutdown.isDefined && zhutdown.get.getParameterTypes.length != 0)
throw new IllegalStateException(
"Method annotated with @shutdown or defined as a shutdown callback in \n[" +
targetClass.getName + "] must have a zero argument definition")
if (preRestart.isDefined) preRestart.get.setAccessible(true)
if (postRestart.isDefined) postRestart.get.setAccessible(true)
if (zhutdown.isDefined) zhutdown.get.setAccessible(true)
// see if we have a method annotated with @inittransactionalstate, if so invoke it
initTxState = methods.find(m => m.isAnnotationPresent(Annotations.inittransactionalstate))
@ -770,6 +808,18 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op
}
}
override def shutdown = {
try {
if (zhutdown.isDefined) {
zhutdown.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
}
} catch {
case e: InvocationTargetException => throw e.getCause
} finally {
AspectInitRegistry.unregister(target.get);
}
}
override def initTransactionalState = {
try {
if (initTxState.isDefined && target.isDefined) initTxState.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)

View file

@ -62,7 +62,6 @@ class ActorInitializationException private[akka](message: String) extends Runtim
*/
object Actor extends Logging {
val TIMEOUT = config.getInt("akka.actor.timeout", 5000)
val RECEIVE_TIMEOUT = config.getInt("akka.actor.receive.timeout", 30000)
val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
/**
@ -435,7 +434,6 @@ trait Actor extends Logging {
// =========================================
private[akka] def base: Receive = try {
cancelReceiveTimeout
lifeCycles orElse (self.hotswap getOrElse receive)
} catch {
case e: NullPointerException => throw new IllegalActorStateException(
@ -443,7 +441,7 @@ trait Actor extends Logging {
}
private val lifeCycles: Receive = {
case HotSwap(code) => self.hotswap = code; checkReceiveTimeout
case HotSwap(code) => self.hotswap = code; self.checkReceiveTimeout // FIXME : how to reschedule receivetimeout on hotswap?
case Restart(reason) => self.restart(reason)
case Exit(dead, reason) => self.handleTrapExit(dead, reason)
case Link(child) => self.link(child)
@ -451,25 +449,6 @@ trait Actor extends Logging {
case UnlinkAndStop(child) => self.unlink(child); child.stop
case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
}
@volatile protected[akka] var timeoutActor: Option[ActorRef] = None
private[akka] def cancelReceiveTimeout = {
timeoutActor.foreach {
x =>
Scheduler.unschedule(x)
timeoutActor = None
log.debug("Timeout canceled")
}
}
private[akka] def checkReceiveTimeout = {
//if ((self.hotswap getOrElse receive).isDefinedAt(ReceiveTimeout)) { // FIXME use when 'self' is safe to use, throws NPE sometimes
if ((receive ne null) && receive.isDefinedAt(ReceiveTimeout)) {
log.debug("Scheduling timeout for Actor [" + toString + "]")
timeoutActor = Some(Scheduler.scheduleOnce(self, ReceiveTimeout, self.receiveTimeout, TimeUnit.MILLISECONDS))
}
}
}
private[actor] class AnyOptionAsTypedOption(anyOption: Option[Any]) {

View file

@ -24,12 +24,12 @@ import jsr166x.{Deque, ConcurrentLinkedDeque}
import java.net.InetSocketAddress
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.ConcurrentHashMap
import java.util.{Map => JMap}
import java.lang.reflect.Field
import RemoteActorSerialization._
import com.google.protobuf.ByteString
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
/**
* ActorRef is an immutable and serializable handle to an Actor.
@ -72,6 +72,8 @@ trait ActorRef extends TransactionManagement {
@volatile protected[akka] var _isBeingRestarted = false
@volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServer.HOSTNAME, RemoteServer.PORT)
@volatile protected[akka] var _timeoutActor: Option[ActorRef] = None
@volatile protected[akka] var startOnCreation = false
@volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false
protected[this] val guard = new ReentrantGuard
@ -100,9 +102,9 @@ trait ActorRef extends TransactionManagement {
* User overridable callback/setting.
* <p/>
* Defines the default timeout for an initial receive invocation.
* Used if the receive (or HotSwap) contains a case handling ReceiveTimeout.
* When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
*/
@volatile var receiveTimeout: Long = Actor.RECEIVE_TIMEOUT
@volatile var receiveTimeout: Option[Long] = None
/**
* User overridable callback/setting.
@ -551,6 +553,24 @@ trait ActorRef extends TransactionManagement {
}
override def toString = "Actor[" + id + ":" + uuid + "]"
protected[akka] def cancelReceiveTimeout = {
_timeoutActor.foreach {
x =>
if (x.isRunning) Scheduler.unschedule(x)
_timeoutActor = None
log.debug("Timeout canceled for %s", this)
}
}
protected [akka] def checkReceiveTimeout = {
cancelReceiveTimeout
receiveTimeout.foreach { timeout =>
log.debug("Scheduling timeout for %s", this)
_timeoutActor = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, timeout, TimeUnit.MILLISECONDS))
}
}
}
/**
@ -734,8 +754,9 @@ sealed class LocalActorRef private[akka](
/**
* Shuts down the actor its dispatcher and message queue.
*/
def stop = guard.withGuard {
def stop() = guard.withGuard {
if (isRunning) {
cancelReceiveTimeout
dispatcher.unregister(this)
_transactionFactory = None
_isRunning = false
@ -1000,6 +1021,7 @@ sealed class LocalActorRef private[akka](
setTransactionSet(txSet)
try {
cancelReceiveTimeout // FIXME: leave this here?
if (isTransactor) {
val txFactory = _transactionFactory.getOrElse(DefaultGlobalTransactionFactory)
atomic(txFactory) {
@ -1057,7 +1079,7 @@ sealed class LocalActorRef private[akka](
val failedActor = actorInstance.get
failedActor.synchronized {
lifeCycle.get match {
case LifeCycle(scope, _) => {
case LifeCycle(scope, _, _) => {
scope match {
case Permanent =>
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
@ -1086,7 +1108,7 @@ sealed class LocalActorRef private[akka](
linkedActorsAsList.foreach { actorRef =>
if (actorRef.lifeCycle.isEmpty) actorRef.lifeCycle = Some(LifeCycle(Permanent))
actorRef.lifeCycle.get match {
case LifeCycle(scope, _) => {
case LifeCycle(scope, _, _) => {
scope match {
case Permanent => actorRef.restart(reason)
case Temporary => shutDownTemporaryActor(actorRef)
@ -1158,7 +1180,7 @@ sealed class LocalActorRef private[akka](
ActorRegistry.register(this)
if (id == "N/A") id = actorClass.getName // if no name set, then use default name (class name)
clearTransactionSet // clear transaction set that might have been created if atomic block has been used within the Actor constructor body
actor.checkReceiveTimeout
checkReceiveTimeout
}
private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) {
@ -1221,7 +1243,7 @@ private[akka] case class RemoteActorRef private[akka] (
this
}
def stop(): Unit = {
def stop: Unit = {
_isRunning = false
_isShutDown = true
}
@ -1237,7 +1259,7 @@ private[akka] case class RemoteActorRef private[akka] (
def actorClass: Class[_ <: Actor] = unsupported
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
def dispatcher: MessageDispatcher = unsupported
def makeTransactionRequired(): Unit = unsupported
def makeTransactionRequired: Unit = unsupported
def transactionConfig_=(config: TransactionConfig): Unit = unsupported
def transactionConfig: TransactionConfig = unsupported
def makeRemote(hostname: String, port: Int): Unit = unsupported
@ -1254,7 +1276,7 @@ private[akka] case class RemoteActorRef private[akka] (
def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = unsupported
def mailboxSize: Int = unsupported
def supervisor: Option[ActorRef] = unsupported
def shutdownLinkedActors(): Unit = unsupported
def shutdownLinkedActors: Unit = unsupported
protected[akka] def mailbox: Deque[MessageInvocation] = unsupported
protected[akka] def restart(reason: Throwable): Unit = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported

View file

@ -85,10 +85,10 @@ object ActorSerialization {
}
val builder = LifeCycleProtocol.newBuilder
a.lifeCycle match {
case Some(LifeCycle(scope, None)) =>
case Some(LifeCycle(scope, None, _)) =>
setScope(builder, scope)
Some(builder.build)
case Some(LifeCycle(scope, Some(callbacks))) =>
case Some(LifeCycle(scope, Some(callbacks), _)) =>
setScope(builder, scope)
builder.setPreRestart(callbacks.preRestart)
builder.setPostRestart(callbacks.postRestart)

View file

@ -82,7 +82,9 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
private def newSubclassingProxy(component: Component): DependencyBinding = {
val targetClass = component.target
val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks))
val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired,
component.lifeCycle.restartCallbacks,
component.lifeCycle.shutdownCallback))
if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get
val remoteAddress =
if (component.remoteAddress.isDefined)
@ -99,7 +101,9 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
val targetClass = component.intf.get
val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry
component.target.getConstructor(Array[Class[_]](): _*).setAccessible(true)
val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks))
val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired,
component.lifeCycle.restartCallbacks,
component.lifeCycle.shutdownCallback))
if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get
val remoteAddress =
if (component.remoteAddress.isDefined)

View file

@ -42,13 +42,15 @@ object ScalaConfig {
case object AllForOne extends FailOverScheme
case object OneForOne extends FailOverScheme
case class LifeCycle(scope: Scope, callbacks: Option[RestartCallbacks]) extends ConfigElement
object LifeCycle {
def apply(scope: Scope) = new LifeCycle(scope, None)
}
case class LifeCycle(scope: Scope,
restartCallbacks: Option[RestartCallbacks] = None,
shutdownCallback: Option[ShutdownCallback] = None) extends ConfigElement
case class RestartCallbacks(preRestart: String, postRestart: String) {
if ((preRestart eq null) || (postRestart eq null)) throw new IllegalArgumentException("Restart callback methods can't be null")
}
case class ShutdownCallback(shutdown: String) {
if (shutdown eq null) throw new IllegalArgumentException("Shutdown callback method can't be null")
}
case object Permanent extends Scope
case object Temporary extends Scope
@ -135,17 +137,25 @@ object JavaConfig {
scheme.transform, maxNrOfRetries, withinTimeRange, trapExceptions.toList)
}
class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val callbacks: RestartCallbacks) extends ConfigElement {
def this(scope: Scope) = this(scope, null)
class LifeCycle(@BeanProperty val scope: Scope,
@BeanProperty val restartCallbacks: RestartCallbacks,
@BeanProperty val shutdownCallback: ShutdownCallback) extends ConfigElement {
def this(scope: Scope) = this(scope, null, null)
def this(scope: Scope, restartCallbacks: RestartCallbacks) = this(scope, restartCallbacks, null)
def this(scope: Scope, shutdownCallback: ShutdownCallback) = this(scope, null, shutdownCallback)
def transform = {
val callbackOption = if (callbacks eq null) None else Some(callbacks.transform)
se.scalablesolutions.akka.config.ScalaConfig.LifeCycle(scope.transform, callbackOption)
val restartCallbacksOption = if (restartCallbacks eq null) None else Some(restartCallbacks.transform)
val shutdownCallbackOption = if (shutdownCallback eq null) None else Some(shutdownCallback.transform)
se.scalablesolutions.akka.config.ScalaConfig.LifeCycle(scope.transform, restartCallbacksOption, shutdownCallbackOption)
}
}
class RestartCallbacks(@BeanProperty val preRestart: String, @BeanProperty val postRestart: String) {
def transform = se.scalablesolutions.akka.config.ScalaConfig.RestartCallbacks(preRestart, postRestart)
}
class ShutdownCallback(@BeanProperty val shutdown: String) {
def transform = se.scalablesolutions.akka.config.ScalaConfig.ShutdownCallback(shutdown)
}
abstract class Scope extends ConfigElement {
def transform: se.scalablesolutions.akka.config.ScalaConfig.Scope

View file

@ -0,0 +1,37 @@
package se.scalablesolutions.akka.actor;
import java.util.concurrent.CountDownLatch;
public class SamplePojo {
private CountDownLatch latch;
public boolean _pre = false;
public boolean _post = false;
public boolean _down = false;
public CountDownLatch newCountdownLatch(int count) {
latch = new CountDownLatch(count);
return latch;
}
public String fail() {
throw new RuntimeException("expected");
}
public void pre() {
_pre = true;
latch.countDown();
}
public void post() {
_post = true;
latch.countDown();
}
public void down() {
_down = true;
latch.countDown();
}
}

View file

@ -0,0 +1,52 @@
package se.scalablesolutions.akka.actor;
import se.scalablesolutions.akka.actor.annotation.postrestart;
import se.scalablesolutions.akka.actor.annotation.prerestart;
import se.scalablesolutions.akka.actor.annotation.shutdown;
import java.util.concurrent.CountDownLatch;
public class SamplePojoAnnotated {
private CountDownLatch latch;
public boolean _pre = false;
public boolean _post = false;
public boolean _down = false;
public SamplePojoAnnotated() {
latch = new CountDownLatch(1);
}
public CountDownLatch newCountdownLatch(int count) {
latch = new CountDownLatch(count);
return latch;
}
public String greet(String s) {
return "hello " + s;
}
public String fail() {
throw new RuntimeException("expected");
}
@prerestart
public void pre() {
_pre = true;
latch.countDown();
}
@postrestart
public void post() {
_post = true;
latch.countDown();
}
@shutdown
public void down() {
_down = true;
latch.countDown();
}
}

View file

@ -0,0 +1,155 @@
package se.scalablesolutions.akka.actor
import org.junit.runner.RunWith
import org.scalatest.{BeforeAndAfterAll, Spec}
import org.scalatest.junit.JUnitRunner
import org.scalatest.matchers.ShouldMatchers
import se.scalablesolutions.akka.config.ActiveObjectConfigurator
import se.scalablesolutions.akka.config.JavaConfig._
/**
* @author Martin Krasser
*/
@RunWith(classOf[JUnitRunner])
class ActiveObjectLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAfterAll {
var conf1: ActiveObjectConfigurator = _
var conf2: ActiveObjectConfigurator = _
var conf3: ActiveObjectConfigurator = _
var conf4: ActiveObjectConfigurator = _
override protected def beforeAll() = {
val strategy = new RestartStrategy(new AllForOne(), 3, 1000, Array(classOf[Exception]))
val comp1 = new Component(classOf[SamplePojoAnnotated], new LifeCycle(new Permanent()), 1000)
val comp2 = new Component(classOf[SamplePojoAnnotated], new LifeCycle(new Temporary()), 1000)
val comp3 = new Component(classOf[SamplePojo], new LifeCycle(new Permanent(), new RestartCallbacks("pre", "post")), 1000)
val comp4 = new Component(classOf[SamplePojo], new LifeCycle(new Temporary(), new ShutdownCallback("down")), 1000)
conf1 = new ActiveObjectConfigurator().configure(strategy, Array(comp1)).supervise
conf2 = new ActiveObjectConfigurator().configure(strategy, Array(comp2)).supervise
conf3 = new ActiveObjectConfigurator().configure(strategy, Array(comp3)).supervise
conf4 = new ActiveObjectConfigurator().configure(strategy, Array(comp4)).supervise
}
override protected def afterAll() = {
conf1.stop
conf2.stop
conf3.stop
conf4.stop
}
describe("ActiveObject lifecycle management") {
it("should restart supervised, annotated active object on failure") {
val obj = conf1.getInstance[SamplePojoAnnotated](classOf[SamplePojoAnnotated])
val cdl = obj.newCountdownLatch(2)
assert(AspectInitRegistry.initFor(obj) ne null)
try {
obj.fail
fail("expected exception not thrown")
} catch {
case e: RuntimeException => {
cdl.await
assert(obj._pre)
assert(obj._post)
assert(!obj._down)
assert(AspectInitRegistry.initFor(obj) ne null)
}
}
}
it("should shutdown supervised, annotated active object on failure") {
val obj = conf2.getInstance[SamplePojoAnnotated](classOf[SamplePojoAnnotated])
val cdl = obj.newCountdownLatch(1)
assert(AspectInitRegistry.initFor(obj) ne null)
try {
obj.fail
fail("expected exception not thrown")
} catch {
case e: RuntimeException => {
cdl.await
assert(!obj._pre)
assert(!obj._post)
assert(obj._down)
assert(AspectInitRegistry.initFor(obj) eq null)
}
}
}
it("should restart supervised, non-annotated active object on failure") {
val obj = conf3.getInstance[SamplePojo](classOf[SamplePojo])
val cdl = obj.newCountdownLatch(2)
assert(AspectInitRegistry.initFor(obj) ne null)
try {
obj.fail
fail("expected exception not thrown")
} catch {
case e: RuntimeException => {
cdl.await
assert(obj._pre)
assert(obj._post)
assert(!obj._down)
assert(AspectInitRegistry.initFor(obj) ne null)
}
}
}
it("should shutdown supervised, non-annotated active object on failure") {
val obj = conf4.getInstance[SamplePojo](classOf[SamplePojo])
val cdl = obj.newCountdownLatch(1)
assert(AspectInitRegistry.initFor(obj) ne null)
try {
obj.fail
fail("expected exception not thrown")
} catch {
case e: RuntimeException => {
cdl.await
assert(!obj._pre)
assert(!obj._post)
assert(obj._down)
assert(AspectInitRegistry.initFor(obj) eq null)
}
}
}
it("should shutdown non-supervised, annotated active object on ActiveObject.stop") {
val obj = ActiveObject.newInstance(classOf[SamplePojoAnnotated])
assert(AspectInitRegistry.initFor(obj) ne null)
assert("hello akka" === obj.greet("akka"))
ActiveObject.stop(obj)
assert(AspectInitRegistry.initFor(obj) eq null)
assert(!obj._pre)
assert(!obj._post)
assert(obj._down)
try {
obj.greet("akka")
fail("access to stopped active object")
} catch {
case e: Exception => { /* test passed */ }
}
}
it("should shutdown non-supervised, annotated active object on ActorRegistry.shutdownAll") {
val obj = ActiveObject.newInstance(classOf[SamplePojoAnnotated])
assert(AspectInitRegistry.initFor(obj) ne null)
assert("hello akka" === obj.greet("akka"))
ActorRegistry.shutdownAll
assert(AspectInitRegistry.initFor(obj) eq null)
assert(!obj._pre)
assert(!obj._post)
assert(obj._down)
try {
obj.greet("akka")
fail("access to stopped active object")
} catch {
case e: Exception => { /* test passed */ }
}
}
it("should shutdown non-supervised, non-initialized active object on ActiveObject.stop") {
val obj = ActiveObject.newInstance(classOf[SamplePojoAnnotated])
ActiveObject.stop(obj)
assert(!obj._pre)
assert(!obj._post)
assert(obj._down)
}
}
}

View file

@ -14,7 +14,7 @@ class ReceiveTimeoutSpec extends JUnitSuite {
val timeoutLatch = new StandardLatch
val timeoutActor = actorOf(new Actor {
self.receiveTimeout = 500
self.receiveTimeout = Some(500L)
protected def receive = {
case ReceiveTimeout => timeoutLatch.open
@ -28,7 +28,7 @@ class ReceiveTimeoutSpec extends JUnitSuite {
val timeoutLatch = new StandardLatch
val timeoutActor = actorOf(new Actor {
self.receiveTimeout = 500
self.receiveTimeout = Some(500L)
protected def receive = {
case ReceiveTimeout => timeoutLatch.open
@ -51,7 +51,7 @@ class ReceiveTimeoutSpec extends JUnitSuite {
val timeoutLatch = new StandardLatch
case object Tick
val timeoutActor = actorOf(new Actor {
self.receiveTimeout = 500
self.receiveTimeout = Some(500L)
protected def receive = {
case Tick => ()
@ -60,6 +60,18 @@ class ReceiveTimeoutSpec extends JUnitSuite {
}).start
timeoutActor ! Tick
assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS) == false)
assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == false)
}
@Test def timeoutShouldNotBeSentWhenNotSpecified = {
val timeoutLatch = new StandardLatch
val timeoutActor = actorOf(new Actor {
protected def receive = {
case ReceiveTimeout => timeoutLatch.open
}
}).start
assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS) == false)
}
}

View file

@ -1,11 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:akka="http://www.akkasource.org/schema/akka" xmlns:beans="http://www.springframework.org/schema/lang"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.akkasource.org/schema/akka
http://scalablesolutions.se/akka/akka.xsd">
xmlns:akka="http://www.akkasource.org/schema/akka"
xmlns:beans="http://www.springframework.org/schema/lang"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.akkasource.org/schema/akka
http://scalablesolutions.se/akka/akka-0.10.xsd">
<akka:active-object id="active-object-with-dispatcher" target="se.scalablesolutions.akka.spring.foo.MyPojo"

View file

@ -1,11 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:akka="http://www.akkasource.org/schema/akka" xmlns:beans="http://www.springframework.org/schema/lang"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.akkasource.org/schema/akka
http://scalablesolutions.se/akka/akka.xsd">
xmlns:akka="http://www.akkasource.org/schema/akka"
xmlns:beans="http://www.springframework.org/schema/lang"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.akkasource.org/schema/akka
http://scalablesolutions.se/akka/akka-0.10.xsd">
<akka:supervision id="supervision1">
<akka:restart-strategy failover="AllForOne" retries="3" timerange="1000">

View file

@ -1,11 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:akka="http://www.akkasource.org/schema/akka" xmlns:beans="http://www.springframework.org/schema/lang"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.akkasource.org/schema/akka
http://scalablesolutions.se/akka/akka.xsd">
xmlns:akka="http://www.akkasource.org/schema/akka"
xmlns:beans="http://www.springframework.org/schema/lang"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.akkasource.org/schema/akka
http://scalablesolutions.se/akka/akka-0.10.xsd">
<bean id="wrappedService"
class="se.scalablesolutions.akka.actor.ActiveObject"
@ -38,8 +40,8 @@
target="se.scalablesolutions.akka.spring.foo.MyPojo"
timeout="2000"
transactional="true">
<akka:restart-callbacks pre="preRestart" post="postRestart"/>
<akka:remote host="localhost" port="9999"/>
<akka:restart-callbacks pre="preRestart" post="postRestart"/>
</akka:active-object>
<akka:active-object id="remote-service1" target="se.scalablesolutions.akka.spring.foo.MyPojo" timeout="1000">

View file

@ -105,7 +105,7 @@
</xsd:attribute>
</xsd:complexType>
<!-- callbacks -->
<!-- restart callbacks -->
<xsd:complexType name="restart-callbacks-type">
<xsd:attribute name="pre" type="xsd:string">
<xsd:annotation>
@ -123,11 +123,23 @@
</xsd:attribute>
</xsd:complexType>
<!-- shutdown callbacks -->
<xsd:complexType name="shutdown-callback-type">
<xsd:attribute name="method" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Shutdown callback method that is called during shut down.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- active object -->
<xsd:complexType name="active-object-type">
<xsd:sequence>
<xsd:element name="remote" type="remote-type" minOccurs="0" maxOccurs="1"/>
<xsd:element name="restart-callbacks" type="restart-callbacks-type" minOccurs="0" maxOccurs="1"/>
<xsd:element name="shutdown-callback" type="shutdown-callback-type" minOccurs="0" maxOccurs="1"/>
<xsd:element name="dispatcher" type="dispatcher-type" minOccurs="0" maxOccurs="1"/>
<xsd:element ref="dispatcher" minOccurs="0" maxOccurs="1"/>
<xsd:element ref="beans:property" minOccurs="0" maxOccurs="unbounded"/>

View file

@ -1,240 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<xsd:schema xmlns="http://www.akkasource.org/schema/akka"
targetNamespace="http://www.akkasource.org/schema/akka"
elementFormDefault="qualified" attributeFormDefault="unqualified"
xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<xsd:import namespace="http://www.springframework.org/schema/beans"
schemaLocation="http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"/>
<!-- base types -->
<!-- restart strategies enumeration -->
<xsd:simpleType name="failover-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="AllForOne"/>
<xsd:enumeration value="OneForOne"/>
</xsd:restriction>
</xsd:simpleType>
<!-- restart strategies enumeration -->
<xsd:simpleType name="lifecycle-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="permanent"/>
<xsd:enumeration value="temporary"/>
</xsd:restriction>
</xsd:simpleType>
<!-- Scopes enumeration -->
<xsd:simpleType name="scope-enum-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="prototype"/>
<xsd:enumeration value="singleton"/>
</xsd:restriction>
</xsd:simpleType>
<!-- dispatchers enumeration -->
<xsd:simpleType name="dispatcher-enum-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="executor-based-event-driven"/>
<xsd:enumeration value="reactor-based-thread-pool-event-driven"/>
<xsd:enumeration value="reactor-based-single-thread-event-driven"/>
<xsd:enumeration value="thread-based"/>
</xsd:restriction>
</xsd:simpleType>
<!-- dispatcher queue types enumeration -->
<xsd:simpleType name="dispatcher-queue-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="bounded-linked-blocking-queue"/>
<xsd:enumeration value="unbounded-linked-blocking-queue"/>
<xsd:enumeration value="synchronous-queue"/>
<xsd:enumeration value="bounded-array-blocking-queue"/>
</xsd:restriction>
</xsd:simpleType>
<!-- thread pool rejection policies enumeration -->
<xsd:simpleType name="rejection-policy-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="abort-policy"/>
<xsd:enumeration value="caller-runs-policy"/>
<xsd:enumeration value="discard-oldest-policy"/>
<xsd:enumeration value="discard-policy"/>
</xsd:restriction>
</xsd:simpleType>
<!-- dispatcher type -->
<xsd:complexType name="dispatcher-type">
<xsd:choice minOccurs="0" maxOccurs="1">
<xsd:element name="thread-pool" type="threadpool-type"/>
</xsd:choice>
<xsd:attribute name="id" type="xsd:ID"/>
<xsd:attribute name="ref" type="xsd:string"/>
<xsd:attribute name="type" type="dispatcher-enum-type"/>
<xsd:attribute name="name" type="xsd:string"/>
</xsd:complexType>
<xsd:complexType name="threadpool-type">
<xsd:attribute name="queue" type="dispatcher-queue-type"/>
<xsd:attribute name="bound" type="xsd:integer"/>
<xsd:attribute name="capacity" type="xsd:integer"/>
<xsd:attribute name="fairness" type="xsd:boolean"/>
<xsd:attribute name="core-pool-size" type="xsd:integer"/>
<xsd:attribute name="max-pool-size" type="xsd:integer"/>
<xsd:attribute name="keep-alive" type="xsd:long"/>
<xsd:attribute name="rejection-policy" type="rejection-policy-type"/>
</xsd:complexType>
<!-- Remote -->
<xsd:complexType name="remote-type">
<xsd:attribute name="host" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
Name of the remote host.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="port" type="xsd:integer" use="required">
<xsd:annotation>
<xsd:documentation>
Port of the remote host.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- callbacks -->
<xsd:complexType name="restart-callbacks-type">
<xsd:attribute name="pre" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Pre restart callback method that is called during restart.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="post" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Post restart callback method that is called during restart.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- active object -->
<xsd:complexType name="active-object-type">
<xsd:all minOccurs="0">
<xsd:element name="remote" type="remote-type" minOccurs="0" maxOccurs="1"/>
<xsd:element name="restart-callbacks" type="restart-callbacks-type" minOccurs="0" maxOccurs="1"/>
<xsd:element name="dispatcher" type="dispatcher-type" minOccurs="0" maxOccurs="1"/>
<xsd:element ref="dispatcher" minOccurs="0"/>
</xsd:all>
<xsd:attribute name="id" type="xsd:ID"/>
<xsd:attribute name="target" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
Name of the target class.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="timeout" type="xsd:long" use="required">
<xsd:annotation>
<xsd:documentation>
default timeout for '!!' invocations
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="transactional" type="xsd:boolean">
<xsd:annotation>
<xsd:documentation>
Set to true if messages should have REQUIRES_NEW semantics
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="interface" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Interface implemented by target class.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="lifecycle" type="lifecycle-type">
<xsd:annotation>
<xsd:documentation>
Lifecycle, permanent or temporary
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="scope" type="scope-enum-type">
<xsd:annotation>
<xsd:documentation>
Supported scopes are singleton and prototype
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- trap exits -->
<xsd:complexType name="trap-exits-type">
<xsd:choice minOccurs="1" maxOccurs="unbounded">
<xsd:element name="trap-exit" type="xsd:string"/>
</xsd:choice>
</xsd:complexType>
<!-- active objects -->
<xsd:complexType name="active-objects-type">
<xsd:choice minOccurs="1" maxOccurs="unbounded">
<xsd:element name="active-object" type="active-object-type"/>
</xsd:choice>
</xsd:complexType>
<!-- Supervisor strategy -->
<xsd:complexType name="strategy-type">
<xsd:sequence>
<xsd:element name="trap-exits" type="trap-exits-type" minOccurs="1" maxOccurs="1"/>
</xsd:sequence>
<xsd:attribute name="failover" type="failover-type">
<xsd:annotation>
<xsd:documentation>
Failover scheme, AllForOne or OneForOne
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="retries" type="xsd:int">
<xsd:annotation>
<xsd:documentation>
Maximal number of retries.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="timerange" type="xsd:int">
<xsd:annotation>
<xsd:documentation>
Timerange for restart.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- Supervisor strategy -->
<xsd:complexType name="supervision-type">
<xsd:all>
<xsd:element name="restart-strategy" type="strategy-type" minOccurs="1" maxOccurs="1"/>
<xsd:element name="active-objects" type="active-objects-type" minOccurs="0" maxOccurs="1"/>
<xsd:element name="supervision" type="supervision-type" minOccurs="0"/>
</xsd:all>
<xsd:attribute name="id" type="xsd:ID"/>
</xsd:complexType>
<!-- ActiveObject -->
<xsd:element name="active-object" type="active-object-type"/>
<!-- Dispatcher -->
<xsd:element name="dispatcher" type="dispatcher-type"/>
<!-- Supervision -->
<xsd:element name="supervision" type="supervision-type"/>
</xsd:schema>

View file

@ -1,22 +1,26 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.spring
import java.beans.PropertyDescriptor
import java.lang.reflect.Method
import javax.annotation.PreDestroy
import javax.annotation.PostConstruct
import reflect.BeanProperty
import org.springframework.beans.BeanWrapperImpl
import org.springframework.beans.BeanWrapper
import org.springframework.beans.BeanUtils
import org.springframework.util.ReflectionUtils
import org.springframework.util.StringUtils
import org.springframework.beans.factory.BeanFactory
import org.springframework.beans.factory.config.AbstractFactoryBean
import se.scalablesolutions.akka.actor.ActiveObject
import reflect.BeanProperty
import se.scalablesolutions.akka.config.ScalaConfig.RestartCallbacks
import org.springframework.context.{ApplicationContext,ApplicationContextAware}
import org.springframework.util.ReflectionUtils
import org.springframework.util.StringUtils
import se.scalablesolutions.akka.actor.{ActiveObjectConfiguration, ActiveObject}
import se.scalablesolutions.akka.config.ScalaConfig.{ShutdownCallback, RestartCallbacks}
import se.scalablesolutions.akka.dispatch.MessageDispatcher
import se.scalablesolutions.akka.util.Logging
@ -25,8 +29,9 @@ import se.scalablesolutions.akka.util.Logging
*
* @author michaelkober
* @author <a href="johan.rask@jayway.com">Johan Rask</a>
* @author Martin Krasser
*/
class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging {
class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with ApplicationContextAware {
import StringReflect._
import AkkaSpringConfigurationTags._
@ -36,12 +41,28 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging {
@BeanProperty var transactional: Boolean = false
@BeanProperty var pre: String = ""
@BeanProperty var post: String = ""
@BeanProperty var shutdown: String = ""
@BeanProperty var host: String = ""
@BeanProperty var port: Int = _
@BeanProperty var lifecycle: String = ""
@BeanProperty var dispatcher: DispatcherProperties = _
@BeanProperty var scope:String = VAL_SCOPE_SINGLETON
@BeanProperty var property:PropertyEntries = _
@BeanProperty var applicationContext:ApplicationContext = _
// Holds info about if deps has been set or not. Depends on
// if interface is specified or not. We must set deps on
// target instance if interface is specified
var hasSetDependecies = false
override def isSingleton:Boolean = {
if(scope.equals(VAL_SCOPE_SINGLETON)) {
true
} else {
false
}
}
/*
* @see org.springframework.beans.factory.FactoryBean#getObjectType()
@ -57,28 +78,54 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging {
* @see org.springframework.beans.factory.config.AbstractFactoryBean#createInstance()
*/
def createInstance: AnyRef = {
if(scope.equals(VAL_SCOPE_SINGLETON)) {
setSingleton(true)
} else {
setSingleton(false)
}
var argumentList = ""
if (isRemote) argumentList += "r"
if (hasInterface) argumentList += "i"
if (hasDispatcher) argumentList += "d"
setProperties(
create(argumentList))
}
postConstruct(
setProperties(
create(argumentList)))
}
/**
* This method manages <property/> element by injecting either
* values (<property value="value"/>) and bean references (<property ref="beanId"/>)
* Stop the active object if it is a singleton.
*/
override def destroyInstance(instance:AnyRef) {
ActiveObject.stop(instance)
}
/**
* Invokes any method annotated with @PostConstruct
* When interfaces are specified, this method is invoked both on the
* target instance and on the active object, so a developer is free do decide
* where the annotation should be. If no interface is specified it is only invoked
* on the active object
*/
private def postConstruct(ref:AnyRef) : AnyRef = {
// Invoke postConstruct method if any
for(method <- ref.getClass.getMethods) {
if(method.isAnnotationPresent(classOf[PostConstruct])) {
method.invoke(ref)
}
}
ref
}
private def setProperties(ref:AnyRef) : AnyRef = {
if(hasSetDependecies) {
return ref
}
log.debug("Processing properties and dependencies for target class %s",target)
val beanWrapper = new BeanWrapperImpl(ref);
for(entry <- property.entryList) {
if(ref.isInstanceOf[ApplicationContextAware]) {
log.debug("Setting application context")
beanWrapper.setPropertyValue("applicationContext",applicationContext)
}
for(entry <- property.entryList) {
val propertyDescriptor = BeanUtils.getPropertyDescriptor(ref.getClass,entry.name)
val method = propertyDescriptor.getWriteMethod();
@ -97,60 +144,50 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging {
ref
}
// TODO: check if this works in 2.8 (type inferred to Nothing instead of AnyRef here)
//
// private[akka] def create(argList : String) : AnyRef = argList match {
// case "r" => ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, host, port, callbacks)
// case "ri" => ActiveObject.newRemoteInstance(interface.toClass, target.toClass, timeout, transactional, host, port, callbacks)
// case "rd" => ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks)
// case "rid" => ActiveObject.newRemoteInstance(interface.toClass, target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks)
// case "i" => ActiveObject.newInstance(interface.toClass, target.toClass, timeout, transactional, callbacks)
// case "id" => ActiveObject.newInstance(interface.toClass, target.toClass, timeout, transactional, dispatcherInstance, callbacks)
// case "d" => ActiveObject.newInstance(target.toClass, timeout, transactional, dispatcherInstance, callbacks)
// case _ => ActiveObject.newInstance(target.toClass, timeout, transactional, callbacks)
// }
private[akka] def create(argList : String) : AnyRef = {
if (argList == "r") {
ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, host, port, callbacks)
ActiveObject.newInstance(target.toClass, createConfig.makeRemote(host, port))
} else if (argList == "ri" ) {
ActiveObject.newRemoteInstance(interface.toClass, aNewInstance(target.toClass), timeout, transactional, host, port, callbacks)
ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), createConfig.makeRemote(host, port))
} else if (argList == "rd") {
ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks)
ActiveObject.newInstance(target.toClass, createConfig.makeRemote(host, port).dispatcher(dispatcherInstance))
} else if (argList == "rid") {
ActiveObject.newRemoteInstance(interface.toClass, aNewInstance(target.toClass), timeout, transactional, dispatcherInstance, host, port, callbacks)
ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), createConfig.makeRemote(host, port).dispatcher(dispatcherInstance))
} else if (argList == "i") {
ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), timeout, transactional, callbacks)
ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), createConfig)
} else if (argList == "id") {
ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), timeout, transactional, dispatcherInstance, callbacks)
ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), createConfig.dispatcher(dispatcherInstance))
} else if (argList == "d") {
ActiveObject.newInstance(target.toClass, timeout, transactional, dispatcherInstance, callbacks)
ActiveObject.newInstance(target.toClass, createConfig.dispatcher(dispatcherInstance))
} else {
ActiveObject.newInstance(target.toClass, timeout, transactional, callbacks)
ActiveObject.newInstance(target.toClass, createConfig)
}
}
def aNewInstance[T <: AnyRef](clazz: Class[T]) : T = {
clazz.newInstance().asInstanceOf[T]
}
/**
* create Option[RestartCallback]
*/
private def callbacks: Option[RestartCallbacks] = {
if (hasCallbacks) {
val callbacks = new RestartCallbacks(pre, post)
Some(callbacks)
} else {
None
}
private[akka] def createConfig: ActiveObjectConfiguration = {
val config = new ActiveObjectConfiguration().timeout(timeout)
if (hasRestartCallbacks) config.restartCallbacks(pre, post)
if (hasShutdownCallback) config.shutdownCallback(shutdown)
if (transactional) config.makeTransactionRequired
config
}
def aNewInstance[T <: AnyRef](clazz: Class[T]) : T = {
var ref = clazz.newInstance().asInstanceOf[T]
postConstruct(
setProperties(ref))
hasSetDependecies = true
ref
}
private[akka] def isRemote = (host != null) && (!host.isEmpty)
private[akka] def hasInterface = (interface != null) && (!interface.isEmpty)
private[akka] def hasCallbacks = ((pre != null) && !pre.isEmpty) || ((post != null) && !post.isEmpty)
private[akka] def hasRestartCallbacks = ((pre != null) && !pre.isEmpty) || ((post != null) && !post.isEmpty)
private[akka] def hasShutdownCallback = ((shutdown != null) && !shutdown.isEmpty)
private[akka] def hasDispatcher = (dispatcher != null) && (dispatcher.dispatcherType != null) && (!dispatcher.dispatcherType.isEmpty)

View file

@ -13,6 +13,7 @@ import se.scalablesolutions.akka.actor.IllegalActorStateException
* Parser trait for custom namespace configuration for active-object.
* @author michaelkober
* @author <a href="johan.rask@jayway.com">Johan Rask</a>
* @author Martin Krasser
*/
trait ActiveObjectParser extends BeanParser with DispatcherParser {
import AkkaSpringConfigurationTags._
@ -25,7 +26,8 @@ trait ActiveObjectParser extends BeanParser with DispatcherParser {
def parseActiveObject(element: Element): ActiveObjectProperties = {
val objectProperties = new ActiveObjectProperties()
val remoteElement = DomUtils.getChildElementByTagName(element, REMOTE_TAG);
val callbacksElement = DomUtils.getChildElementByTagName(element, RESTART_CALLBACKS_TAG);
val restartCallbacksElement = DomUtils.getChildElementByTagName(element, RESTART_CALLBACKS_TAG);
val shutdownCallbackElement = DomUtils.getChildElementByTagName(element, SHUTDOWN_CALLBACK_TAG);
val dispatcherElement = DomUtils.getChildElementByTagName(element, DISPATCHER_TAG)
val propertyEntries = DomUtils.getChildElementsByTagName(element,PROPERTYENTRY_TAG)
@ -34,14 +36,18 @@ trait ActiveObjectParser extends BeanParser with DispatcherParser {
objectProperties.port = mandatory(remoteElement, PORT).toInt
}
if (callbacksElement != null) {
objectProperties.preRestart = callbacksElement.getAttribute(PRE_RESTART)
objectProperties.postRestart = callbacksElement.getAttribute(POST_RESTART)
if (restartCallbacksElement != null) {
objectProperties.preRestart = restartCallbacksElement.getAttribute(PRE_RESTART)
objectProperties.postRestart = restartCallbacksElement.getAttribute(POST_RESTART)
if ((objectProperties.preRestart.isEmpty) && (objectProperties.preRestart.isEmpty)) {
throw new IllegalActorStateException("At least one of pre or post must be defined.")
}
}
if (shutdownCallbackElement != null) {
objectProperties.shutdown = shutdownCallbackElement.getAttribute("method")
}
if (dispatcherElement != null) {
val dispatcherProperties = parseDispatcher(dispatcherElement)
objectProperties.dispatcher = dispatcherProperties

View file

@ -10,6 +10,7 @@ import AkkaSpringConfigurationTags._
/**
* Data container for active object configuration data.
* @author michaelkober
* @author Martin Krasser
*/
class ActiveObjectProperties {
var target: String = ""
@ -18,10 +19,11 @@ class ActiveObjectProperties {
var transactional: Boolean = false
var preRestart: String = ""
var postRestart: String = ""
var shutdown: String = ""
var host: String = ""
var port: Int = _
var lifecycle: String = ""
var scope:String = ""
var scope:String = VAL_SCOPE_SINGLETON
var dispatcher: DispatcherProperties = _
var propertyEntries = new PropertyEntries()
@ -35,6 +37,7 @@ class ActiveObjectProperties {
builder.addPropertyValue(PORT, port)
builder.addPropertyValue(PRE_RESTART, preRestart)
builder.addPropertyValue(POST_RESTART, postRestart)
builder.addPropertyValue(SHUTDOWN, shutdown)
builder.addPropertyValue(TIMEOUT, timeout)
builder.addPropertyValue(TARGET, target)
builder.addPropertyValue(INTERFACE, interface)

View file

@ -6,6 +6,7 @@ package se.scalablesolutions.akka.spring
/**
* XML configuration tags.
* @author michaelkober
* @author Martin Krasser
*/
object AkkaSpringConfigurationTags {
@ -20,6 +21,7 @@ object AkkaSpringConfigurationTags {
// active-object sub tags
val RESTART_CALLBACKS_TAG = "restart-callbacks"
val SHUTDOWN_CALLBACK_TAG = "shutdown-callback"
val REMOTE_TAG = "remote"
// superivision sub tags
@ -45,6 +47,7 @@ object AkkaSpringConfigurationTags {
val PORT = "port"
val PRE_RESTART = "pre"
val POST_RESTART = "post"
val SHUTDOWN = "shutdown"
val LIFECYCLE = "lifecycle"
val SCOPE = "scope"

View file

@ -0,0 +1,39 @@
package se.scalablesolutions.akka.spring;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import javax.annotation.PreDestroy;
import javax.annotation.PostConstruct;
public class Pojo implements PojoInf,ApplicationContextAware {
private String string;
private boolean gotApplicationContext = false;
private boolean postConstructInvoked = false;
public boolean gotApplicationContext() {
return gotApplicationContext;
}
public void setApplicationContext(ApplicationContext context) {
gotApplicationContext = true;
}
public void setString(String s) {
string = s;
}
public String getString() {
return string;
}
@PostConstruct
public void create() {
postConstructInvoked = true;
}
public boolean isPostConstructInvoked() {
return postConstructInvoked;
}
}

View file

@ -0,0 +1,14 @@
package se.scalablesolutions.akka.spring;
import javax.annotation.PreDestroy;
import javax.annotation.PostConstruct;
public interface PojoInf {
public String getString();
public boolean gotApplicationContext();
public boolean isPostConstructInvoked();
@PostConstruct
public void create();
}

View file

@ -1,9 +1,22 @@
package se.scalablesolutions.akka.spring;
import se.scalablesolutions.akka.actor.annotation.shutdown;
public class SampleBean {
public boolean down;
public SampleBean() {
down = false;
}
public String foo(String s) {
return "hello " + s;
}
@shutdown
public void shutdown() {
down = true;
}
}

View file

@ -6,16 +6,28 @@
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.akkasource.org/schema/akka
http://scalablesolutions.se/akka/akka-0.10.xsd">
<akka:active-object id="bean"
target="org.springframework.core.io.ResourceEditor"
transactional="true"
timeout="1000"
scope="prototype">
<property name="source" ref="string"/>
</akka:active-object>
<bean id="string" class="java.lang.String">
<constructor-arg value="someString"/>
</bean>
</beans>
<akka:active-object id="sample" target="se.scalablesolutions.akka.spring.SampleBean" timeout="1000" />
<akka:active-object id="bean-singleton" target="se.scalablesolutions.akka.spring.SampleBean" timeout="1000"/>
<akka:active-object id="bean-prototype" target="se.scalablesolutions.akka.spring.SampleBean" timeout="1000" scope="prototype"/>
<akka:active-object id="bean"
target="org.springframework.core.io.ResourceEditor"
transactional="true"
timeout="1000"
scope="prototype">
<property name="source" ref="string"/>
</akka:active-object>
<bean id="string" class="java.lang.String">
<constructor-arg value="someString"/>
</bean>
<akka:active-object id="pojoInf"
target="se.scalablesolutions.akka.spring.Pojo"
interface="se.scalablesolutions.akka.spring.PojoInf"
scope="singleton"
timeout="1000">
<property name="string" value="akka rocks"/>
</akka:active-object>
</beans>

View file

@ -54,8 +54,8 @@ class ActiveObjectBeanDefinitionParserTest extends Spec with ShouldMatchers {
</akka:active-object>
val props = parser.parseActiveObject(dom(xml).getDocumentElement);
assert(props != null)
assert(props.dispatcher.dispatcherType == "thread-based")
}
assert(props.dispatcher.dispatcherType === "thread-based")
}
it("should parse remote ActiveObjects configuration") {
val xml = <akka:active-object id="remote active-object" target="se.scalablesolutions.akka.spring.foo.MyPojo"
@ -64,8 +64,8 @@ class ActiveObjectBeanDefinitionParserTest extends Spec with ShouldMatchers {
</akka:active-object>
val props = parser.parseActiveObject(dom(xml).getDocumentElement);
assert(props != null)
assert(props.host == "com.some.host")
assert(props.port == 9999)
assert(props.host === "com.some.host")
assert(props.port === 9999)
}
}
}

View file

@ -65,10 +65,33 @@ class ActiveObjectFactoryBeanTest extends Spec with ShouldMatchers {
assert(target.getSource === entry.value)
}
it("should create an application context and inject a string dependency") {
it("should create an application context and verify dependency injection") {
var ctx = new ClassPathXmlApplicationContext("appContext.xml");
val target:ResourceEditor = ctx.getBean("bean").asInstanceOf[ResourceEditor]
assert(target.getSource === "someString")
val pojoInf = ctx.getBean("pojoInf").asInstanceOf[PojoInf];
println("pojoInf = " + pojoInf.getString)
Thread.sleep(200)
assert(pojoInf.isPostConstructInvoked)
assert(pojoInf.getString == "akka rocks")
assert(pojoInf.gotApplicationContext)
}
it("should stop the created active object when scope is singleton and the context is closed") {
var ctx = new ClassPathXmlApplicationContext("appContext.xml");
val target = ctx.getBean("bean-singleton").asInstanceOf[SampleBean]
assert(!target.down)
ctx.close
assert(target.down)
}
it("should not stop the created active object when scope is prototype and the context is closed") {
var ctx = new ClassPathXmlApplicationContext("appContext.xml");
val target = ctx.getBean("bean-prototype").asInstanceOf[SampleBean]
assert(!target.down)
ctx.close
assert(!target.down)
}
}
}

View file

@ -49,11 +49,21 @@ class SupervisionBeanDefinitionParserTest extends Spec with ShouldMatchers {
parser.parseSupervisor(createSupervisorElement, builder);
val supervised = builder.getBeanDefinition.getPropertyValues.getPropertyValue("supervised").getValue.asInstanceOf[List[ActiveObjectProperties]]
assert(supervised != null)
expect(3) { supervised.length }
expect(4) { supervised.length }
val iterator = supervised.iterator
expect("foo.bar.Foo") { iterator.next.target }
expect("foo.bar.Bar") { iterator.next.target }
expect("foo.bar.MyPojo") { iterator.next.target }
val prop1 = iterator.next
val prop2 = iterator.next
val prop3 = iterator.next
val prop4 = iterator.next
expect("foo.bar.Foo") { prop1.target }
expect("foo.bar.Bar") { prop2.target }
expect("foo.bar.MyPojo") { prop3.target }
expect("foo.bar.MyPojo") { prop4.target }
expect("preRestart") { prop3.preRestart }
expect("postRestart") { prop3.postRestart }
expect("shutdown") { prop4.shutdown }
expect("permanent") { prop1.lifecycle }
expect("temporary") { prop4.lifecycle }
}
it("should throw IllegalArgumentException on missing mandatory attributes") {
@ -87,6 +97,9 @@ class SupervisionBeanDefinitionParserTest extends Spec with ShouldMatchers {
<akka:active-object target="foo.bar.MyPojo" lifecycle="temporary" timeout="1000">
<akka:restart-callbacks pre="preRestart" post="postRestart"/>
</akka:active-object>
<akka:active-object target="foo.bar.MyPojo" lifecycle="temporary" timeout="1000">
<akka:shutdown-callback method="shutdown"/>
</akka:active-object>
</akka:active-objects>
</akka:supervision>
dom(xml).getDocumentElement