diff --git a/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/TypedActorGuiceConfiguratorTest.java similarity index 100% rename from akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/TypedActorGuiceConfiguratorTest.java diff --git a/akka-camel/src/main/scala/component/ActorComponent.scala b/akka-camel/src/main/scala/component/ActorComponent.scala index e267fcd077..2497b47a00 100644 --- a/akka-camel/src/main/scala/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/component/ActorComponent.scala @@ -132,10 +132,8 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) with Asyn result match { case Some(msg: Failure) => exchange.fromFailureMessage(msg) case Some(msg) => exchange.fromResponseMessage(Message.canonicalize(msg)) - case None => { - throw new TimeoutException("timeout (%d ms) while waiting response from %s" - format (actor.timeout, ep.getEndpointUri)) - } + case None => throw new TimeoutException("timeout (%d ms) while waiting response from %s" + format (actor.timeout, ep.getEndpointUri)) } } diff --git a/akka-camel/src/main/scala/component/ActiveObjectComponent.scala b/akka-camel/src/main/scala/component/TypedActorComponent.scala similarity index 100% rename from akka-camel/src/main/scala/component/ActiveObjectComponent.scala rename to akka-camel/src/main/scala/component/TypedActorComponent.scala diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoImpl.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoImpl.java index f26719585e..8866acf573 100644 --- a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoImpl.java +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoImpl.java @@ -19,6 +19,4 @@ public class PojoImpl extends TypedActor implements PojoIntf { public String m2(@Body String b, @Header("test") String h) { return "m2impl: " + b + " " + h; } - - } diff --git a/akka-camel/src/test/scala/CamelServiceFeatureTest.scala b/akka-camel/src/test/scala/CamelServiceFeatureTest.scala index cd52511517..1fcbcadd64 100644 --- a/akka-camel/src/test/scala/CamelServiceFeatureTest.scala +++ b/akka-camel/src/test/scala/CamelServiceFeatureTest.scala @@ -180,7 +180,7 @@ object CamelServiceFeatureTest { } class TestBlocker(uri: String) extends Actor with Consumer { - self.timeout = 1 + self.timeout = 1000 def endpointUri = uri override def blocking = true protected def receive = { diff --git a/akka-core/src/main/scala/actor/TypedActor.scala b/akka-core/src/main/scala/actor/TypedActor.scala index a40a3d6adf..971551620e 100644 --- a/akka-core/src/main/scala/actor/TypedActor.scala +++ b/akka-core/src/main/scala/actor/TypedActor.scala @@ -26,6 +26,66 @@ import scala.reflect.BeanProperty /** * FIXME: document TypedActor * + * Here is an example of usage (in Java): + *
+ * class PingImpl extends TypedActor implements Ping {
+ * public void hit(int count) {
+ * Pong pong = (Pong) getContext().getSender();
+ * pong.hit(count++);
+ * }
+ *
+ * @Override
+ * public void init() {
+ * ... // optional initialization on start
+ * }
+ *
+ * @Override
+ * public void shutdown() {
+ * ... // optional cleanup on stop
+ * }
+ *
+ * ... // more life-cycle callbacks if needed
+ * }
+ *
+ * // create the ping actor
+ * Ping ping = TypedActor.newInstance(Ping.class, PingImpl.class);
+ *
+ * ping.hit(1); // use the actor
+ * ping.hit(1);
+ *
+ * // stop the actor
+ * TypedActor.stop(ping);
+ *
+ *
+ * Here is an example of usage (in Scala):
+ *
+ * class PingImpl extends TypedActor with Ping {
+ * def hit(count: Int) = {
+ * val pong = context.sender.asInstanceOf[Pong]
+ * pong.hit(count += 1)
+ * }
+ *
+ * override def init = {
+ * ... // optional initialization on start
+ * }
+ *
+ * override def shutdown = {
+ * ... // optional cleanup on stop
+ * }
+ *
+ * ... // more life-cycle callbacks if needed
+ * }
+ *
+ * // create the ping actor
+ * val ping = TypedActor.newInstance(classOf[Ping], classOf[PingImpl])
+ *
+ * ping.hit(1) // use the actor
+ * ping.hit(1)
+ *
+ * // stop the actor
+ * TypedActor.stop(ping)
+ *
+ *
* @author Jonas Bonér
*/
abstract class TypedActor extends Logging {
@@ -37,12 +97,26 @@ abstract class TypedActor extends Logging {
* This class does not contain static information but is updated by the runtime system
* at runtime.
*
+ * You can get a hold of the context using either the 'getContext()' or 'context'
+ * methods from the 'TypedActor' base class.
+ *
+ *
* Here is an example of usage (in Java):
*
- * class PingImpl exends TypedActor implements Ping {
+ * class PingImpl extends TypedActor implements Ping {
* public void hit(int count) {
* Pong pong = (Pong) getContext().getSender();
- * pong.hit(count++)
+ * pong.hit(count++);
+ * }
+ * }
+ *
+ *
+ * Here is an example of usage (in Scala):
+ *
+ * class PingImpl extends TypedActor with Ping {
+ * def hit(count: Int) = {
+ * val pong = context.sender.asInstanceOf[Pong]
+ * pong.hit(count += 1)
* }
* }
*
@@ -50,7 +124,7 @@ abstract class TypedActor extends Logging {
@BeanProperty protected var context: TypedActorContext = _
/**
- * The uuid for the typed actor.
+ * The uuid for the Typed Actor.
*/
@BeanProperty @volatile var uuid = UUID.newUuid.toString
@@ -172,12 +246,25 @@ final class TypedActorConfiguration {
* This class does not contain static information but is updated by the runtime system
* at runtime.
*
+ * You can get a hold of the context using either the 'getContext()' or 'context'
+ * methods from the 'TypedActor' base class.
+ *
* Here is an example of usage (from Java):
*
- * class PingImpl exends TypedActor implements Ping {
+ * class PingImpl extends TypedActor implements Ping {
* public void hit(int count) {
* Pong pong = (Pong) getContext().getSender();
- * pong.hit(count++)
+ * pong.hit(count++);
+ * }
+ * }
+ *
+ *
+ * Here is an example of usage (in Scala):
+ *
+ * class PingImpl extends TypedActor with Ping {
+ * def hit(count: Int) = {
+ * val pong = context.sender.asInstanceOf[Pong]
+ * pong.hit(count += 1)
* }
* }
*
@@ -185,6 +272,7 @@ final class TypedActorConfiguration {
* @author Jonas Bonér
*/
final class TypedActorContext {
+ private[akka] var _self: AnyRef = _
private[akka] var _sender: AnyRef = _
private[akka] var _senderFuture: CompletableFuture[Any] = _
@@ -248,7 +336,7 @@ object TypedActor extends Logging {
}
def newInstance[T](intfClass: Class[T], targetClass: Class[_], config: TypedActorConfiguration): T = {
- val actor = actorOf(new Dispatcher(config._transactionRequired, config._restartCallbacks, config._shutdownCallback))
+ val actor = actorOf(new Dispatcher(config._transactionRequired))
if (config._messageDispatcher.isDefined) actor.dispatcher = config._messageDispatcher.get
newInstance(intfClass, newTypedActor(targetClass), actor, config._host, config.timeout)
}
@@ -257,7 +345,7 @@ object TypedActor extends Logging {
remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val context = injectTypedActorContext(targetInstance)
val proxy = Proxy.newInstance(Array(intfClass), Array(targetInstance), true, false)
- actorRef.actor.asInstanceOf[Dispatcher].initialize(targetInstance.getClass, targetInstance, context)
+ actorRef.actor.asInstanceOf[Dispatcher].initialize(targetInstance.getClass, targetInstance, proxy, context)
actorRef.timeout = timeout
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
AspectInitRegistry.register(proxy, AspectInit(intfClass, targetInstance, actorRef, remoteAddress, timeout))
@@ -274,7 +362,7 @@ object TypedActor extends Logging {
else throw new IllegalActorStateException("Actor [" + targetClass.getName + "] is not a sub class of 'TypedActor'")
}
val context = injectTypedActorContext(proxy)
- actorRef.actor.asInstanceOf[Dispatcher].initialize(targetClass, proxy, context)
+ actorRef.actor.asInstanceOf[Dispatcher].initialize(targetClass, proxy, proxy, context)
actorRef.timeout = timeout
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
AspectInitRegistry.register(proxy, AspectInit(targetClass, proxy, actorRef, remoteAddress, timeout))
@@ -282,81 +370,81 @@ object TypedActor extends Logging {
proxy.asInstanceOf[T]
}
- def stop(obj: AnyRef): Unit = {
- val init = AspectInitRegistry.initFor(obj)
- init.actorRef.stop
- }
-
/**
- * Get the underlying dispatcher actor for the given typed actor.
+ * Stops the current Typed Actor.
*/
- def actorFor(obj: AnyRef): Option[ActorRef] =
- ActorRegistry.actorsFor(classOf[Dispatcher]).find(a => a.actor.asInstanceOf[Dispatcher].target == Some(obj))
+ def stop(proxy: AnyRef): Unit = AspectInitRegistry.initFor(proxy).actorRef.stop
/**
- * Links an other typed actor to this typed actor.
- * @param supervisor the supervisor typed actor
- * @param supervised the typed actor to link
+ * Get the underlying dispatcher actor for the given Typed Actor.
+ */
+ def actorFor(proxy: AnyRef): Option[ActorRef] =
+ ActorRegistry.actorsFor(classOf[Dispatcher]).find(a => a.actor.asInstanceOf[Dispatcher].proxy == proxy)
+
+ /**
+ * Links an other Typed Actor to this Typed Actor.
+ * @param supervisor the supervisor Typed Actor
+ * @param supervised the Typed Actor to link
*/
def link(supervisor: AnyRef, supervised: AnyRef) = {
val supervisorActor = actorFor(supervisor).getOrElse(
- throw new IllegalActorStateException("Can't link when the supervisor is not an typed actor"))
+ throw new IllegalActorStateException("Can't link when the supervisor is not an Typed Actor"))
val supervisedActor = actorFor(supervised).getOrElse(
- throw new IllegalActorStateException("Can't link when the supervised is not an typed actor"))
+ throw new IllegalActorStateException("Can't link when the supervised is not an Typed Actor"))
supervisorActor.link(supervisedActor)
}
/**
- * Links an other typed actor to this typed actor and sets the fault handling for the supervisor.
- * @param supervisor the supervisor typed actor
- * @param supervised the typed actor to link
+ * Links an other Typed Actor to this Typed Actor and sets the fault handling for the supervisor.
+ * @param supervisor the supervisor Typed Actor
+ * @param supervised the Typed Actor to link
* @param handler fault handling strategy
* @param trapExceptions array of exceptions that should be handled by the supervisor
*/
def link(supervisor: AnyRef, supervised: AnyRef,
handler: FaultHandlingStrategy, trapExceptions: Array[Class[_ <: Throwable]]) = {
val supervisorActor = actorFor(supervisor).getOrElse(
- throw new IllegalActorStateException("Can't link when the supervisor is not an typed actor"))
+ throw new IllegalActorStateException("Can't link when the supervisor is not an Typed Actor"))
val supervisedActor = actorFor(supervised).getOrElse(
- throw new IllegalActorStateException("Can't link when the supervised is not an typed actor"))
+ throw new IllegalActorStateException("Can't link when the supervised is not an Typed Actor"))
supervisorActor.trapExit = trapExceptions.toList
supervisorActor.faultHandler = Some(handler)
supervisorActor.link(supervisedActor)
}
/**
- * Unlink the supervised typed actor from the supervisor.
- * @param supervisor the supervisor typed actor
- * @param supervised the typed actor to unlink
+ * Unlink the supervised Typed Actor from the supervisor.
+ * @param supervisor the supervisor Typed Actor
+ * @param supervised the Typed Actor to unlink
*/
def unlink(supervisor: AnyRef, supervised: AnyRef) = {
val supervisorActor = actorFor(supervisor).getOrElse(
- throw new IllegalActorStateException("Can't unlink when the supervisor is not an typed actor"))
+ throw new IllegalActorStateException("Can't unlink when the supervisor is not an Typed Actor"))
val supervisedActor = actorFor(supervised).getOrElse(
- throw new IllegalActorStateException("Can't unlink when the supervised is not an typed actor"))
+ throw new IllegalActorStateException("Can't unlink when the supervised is not an Typed Actor"))
supervisorActor.unlink(supervisedActor)
}
/**
- * Sets the trap exit for the given supervisor typed actor.
- * @param supervisor the supervisor typed actor
+ * Sets the trap exit for the given supervisor Typed Actor.
+ * @param supervisor the supervisor Typed Actor
* @param trapExceptions array of exceptions that should be handled by the supervisor
*/
def trapExit(supervisor: AnyRef, trapExceptions: Array[Class[_ <: Throwable]]) = {
val supervisorActor = actorFor(supervisor).getOrElse(
- throw new IllegalActorStateException("Can't set trap exceptions when the supervisor is not an typed actor"))
+ throw new IllegalActorStateException("Can't set trap exceptions when the supervisor is not an Typed Actor"))
supervisorActor.trapExit = trapExceptions.toList
this
}
/**
- * Sets the fault handling strategy for the given supervisor typed actor.
- * @param supervisor the supervisor typed actor
+ * Sets the fault handling strategy for the given supervisor Typed Actor.
+ * @param supervisor the supervisor Typed Actor
* @param handler fault handling strategy
*/
def faultHandler(supervisor: AnyRef, handler: FaultHandlingStrategy) = {
val supervisorActor = actorFor(supervisor).getOrElse(
- throw new IllegalActorStateException("Can't set fault handler when the supervisor is not an typed actor"))
+ throw new IllegalActorStateException("Can't set fault handler when the supervisor is not an Typed Actor"))
supervisorActor.faultHandler = Some(handler)
this
}
@@ -536,6 +624,7 @@ private[akka] sealed class TypedActorAspect {
.setTarget(targetInstance.getClass.getName)
.setTimeout(timeout)
.setActorType(ActorType.TYPED_ACTOR)
+ .setTypedActorInfo(typedActorInfo)
.build
val requestBuilder = RemoteRequestProtocol.newBuilder
@@ -633,93 +722,30 @@ object Dispatcher {
*
* @author Jonas Bonér
*/
-private[akka] class Dispatcher(transactionalRequired: Boolean,
- var restartCallbacks: Option[RestartCallbacks] = None,
- var shutdownCallback: Option[ShutdownCallback] = None) extends Actor {
+private[akka] class Dispatcher(transactionalRequired: Boolean) extends Actor {
import Dispatcher._
- private[actor] var target: Option[AnyRef] = None
- private var zhutdown: Option[Method] = None
- private var preRestart: Option[Method] = None
- private var postRestart: Option[Method] = None
- private var initTxState: Option[Method] = None
+ private[actor] var proxy: AnyRef = _
private var context: Option[TypedActorContext] = None
private var targetClass: Class[_] = _
+ @volatile private[akka] var targetInstance: TypedActor = _
- def this(transactionalRequired: Boolean) = this(transactionalRequired,None)
-
- private[actor] def initialize(targetClass: Class[_], proxy: AnyRef, ctx: Option[TypedActorContext]) = {
-
- if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired))
- self.makeTransactionRequired
+ private[actor] def initialize(
+ targetClass: Class[_], targetInstance: TypedActor, proxy: AnyRef, ctx: Option[TypedActorContext]) = {
+ if (transactionalRequired || isTransactional(targetClass)) self.makeTransactionRequired
+
self.id = targetClass.getName
this.targetClass = targetClass
- target = Some(proxy)
- context = ctx
- val proxyClass = proxy.getClass
- val methods = proxyClass.getDeclaredMethods.toList
+ this.proxy = proxy
+ this.targetInstance = targetInstance
+ this.context = ctx
if (self.lifeCycle.isEmpty) self.lifeCycle = Some(LifeCycle(Permanent))
-
- // See if we have any config define restart callbacks
- restartCallbacks match {
- case None => {}
- case Some(RestartCallbacks(pre, post)) =>
- preRestart = Some(try {
- proxyClass.getDeclaredMethod(pre, ZERO_ITEM_CLASS_ARRAY: _*)
- } catch { case e => throw new IllegalActorStateException(
- "Could not find pre restart method [" + pre + "] \nin [" +
- targetClass.getName + "]. \nIt must have a zero argument definition.") })
- postRestart = Some(try {
- proxyClass.getDeclaredMethod(post, ZERO_ITEM_CLASS_ARRAY: _*)
- } catch { case e => throw new IllegalActorStateException(
- "Could not find post restart method [" + post + "] \nin [" +
- targetClass.getName + "]. \nIt must have a zero argument definition.") })
- }
- // See if we have any config define a shutdown callback
- shutdownCallback match {
- case None => {}
- case Some(ShutdownCallback(down)) =>
- zhutdown = Some(try {
- proxyClass.getDeclaredMethod(down, ZERO_ITEM_CLASS_ARRAY: _*)
- } catch { case e => throw new IllegalStateException(
- "Could not find shutdown method [" + down + "] \nin [" +
- targetClass.getName + "]. \nIt must have a zero argument definition.") })
- }
-
- // See if we have any annotation defined restart callbacks
- if (!preRestart.isDefined) preRestart = methods.find(m => m.isAnnotationPresent(Annotations.prerestart))
- if (!postRestart.isDefined) postRestart = methods.find(m => m.isAnnotationPresent(Annotations.postrestart))
- // See if we have an annotation defined shutdown callback
- if (!zhutdown.isDefined) zhutdown = methods.find(m => m.isAnnotationPresent(Annotations.shutdown))
-
- if (preRestart.isDefined && preRestart.get.getParameterTypes.length != 0)
- throw new IllegalActorStateException(
- "Method annotated with @prerestart or defined as a restart callback in \n[" +
- targetClass.getName + "] must have a zero argument definition")
- if (postRestart.isDefined && postRestart.get.getParameterTypes.length != 0)
- throw new IllegalActorStateException(
- "Method annotated with @postrestart or defined as a restart callback in \n[" +
- targetClass.getName + "] must have a zero argument definition")
- if (zhutdown.isDefined && zhutdown.get.getParameterTypes.length != 0)
- throw new IllegalStateException(
- "Method annotated with @shutdown or defined as a shutdown callback in \n[" +
- targetClass.getName + "] must have a zero argument definition")
-
- if (preRestart.isDefined) preRestart.get.setAccessible(true)
- if (postRestart.isDefined) postRestart.get.setAccessible(true)
- if (zhutdown.isDefined) zhutdown.get.setAccessible(true)
-
- // see if we have a method annotated with @inittransactionalstate, if so invoke it
- initTxState = methods.find(m => m.isAnnotationPresent(Annotations.inittransactionalstate))
- if (initTxState.isDefined && initTxState.get.getParameterTypes.length != 0)
- throw new IllegalActorStateException("Method annotated with @inittransactionalstate must have a zero argument definition")
- if (initTxState.isDefined) initTxState.get.setAccessible(true)
}
def receive = {
case invocation @ Invocation(joinPoint, isOneWay, _, sender, senderFuture) =>
- TypedActor.log.ifTrace("Invoking typed actor with message:\n" + invocation)
+ TypedActor.log.ifTrace("Invoking Typed Actor with message:\n" + invocation)
context.foreach { ctx =>
if (sender ne null) ctx._sender = sender
if (senderFuture ne null) ctx._senderFuture = senderFuture
@@ -731,55 +757,45 @@ private[akka] class Dispatcher(transactionalRequired: Boolean,
else self.reply(joinPoint.proceed)
// Jan Kronquist: started work on issue 121
- case Link(target) => self.link(target)
- case Unlink(target) => self.unlink(target)
+ case Link(proxy) => self.link(proxy)
+ case Unlink(proxy) => self.unlink(proxy)
case unexpected => throw new IllegalActorStateException(
"Unexpected message [" + unexpected + "] sent to [" + this + "]")
}
override def preRestart(reason: Throwable) {
- try {
- // Since preRestart is called we know that this dispatcher
- // is about to be restarted. Put the instance in a thread
- // local so the new dispatcher can be initialized with the
- // contents of the old.
- //FIXME - This should be considered as a workaround.
- crashedActorTl.set(this)
- preRestart.foreach(_.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*))
- } catch { case e: InvocationTargetException => throw e.getCause }
+ crashedActorTl.set(this)
+ targetInstance.preRestart(reason)
}
override def postRestart(reason: Throwable) {
- try {
- postRestart.foreach(_.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*))
- } catch { case e: InvocationTargetException => throw e.getCause }
+ targetInstance.postRestart(reason)
}
- override def init = {
+ override def init {
// Get the crashed dispatcher from thread local and intitialize this actor with the
// contents of the old dispatcher
- val oldActor = crashedActorTl.get();
+ val oldActor = crashedActorTl.get
if (oldActor != null) {
- initialize(oldActor.targetClass, oldActor.target.get, oldActor.context)
+ initialize(oldActor.targetClass, oldActor.targetInstance, oldActor.proxy, oldActor.context)
crashedActorTl.set(null)
}
}
- override def shutdown = {
- try {
- zhutdown.foreach(_.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*))
- } catch { case e: InvocationTargetException => throw e.getCause
- } finally {
- AspectInitRegistry.unregister(target.get);
- }
+ override def shutdown {
+ targetInstance.shutdown
+ AspectInitRegistry.unregister(proxy);
}
- override def initTransactionalState = {
- try {
- if (initTxState.isDefined && target.isDefined) initTxState.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
- } catch { case e: InvocationTargetException => throw e.getCause }
+ override def initTransactionalState {
+ targetInstance.initTransactionalState
}
+ def isTransactional(clazz: Class[_]): Boolean =
+ if (clazz == null) false
+ else if (clazz.isAnnotationPresent(Annotations.transactionrequired)) true
+ else isTransactional(clazz.getSuperclass)
+
private def serializeArguments(joinPoint: JoinPoint) = {
val args = joinPoint.getRtti.asInstanceOf[MethodRtti].getParameterValues
var unserializable = false
diff --git a/akka-core/src/main/scala/config/TypedActorGuiceConfigurator.scala b/akka-core/src/main/scala/config/TypedActorGuiceConfigurator.scala
index f6c78a6a30..29efc14082 100644
--- a/akka-core/src/main/scala/config/TypedActorGuiceConfigurator.scala
+++ b/akka-core/src/main/scala/config/TypedActorGuiceConfigurator.scala
@@ -84,9 +84,7 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
val targetClass =
if (component.target.isInstanceOf[Class[_ <: TypedActor]]) component.target.asInstanceOf[Class[_ <: TypedActor]]
else throw new IllegalArgumentException("TypedActor [" + component.target.getName + "] must be a subclass of TypedActor")
- val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired,
- component.lifeCycle.restartCallbacks,
- component.lifeCycle.shutdownCallback))
+ val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired))
if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get
val remoteAddress =
if (component.remoteAddress.isDefined)
@@ -100,26 +98,30 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
}
private def newDelegatingProxy(component: Component): DependencyBinding = {
+ component.target.getConstructor(Array[Class[_]](): _*).setAccessible(true)
+
val targetClass = component.intf.get
val instance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry
+
val targetInstance =
- if (instance.isInstanceOf[TypedActor]) component.target.asInstanceOf[TypedActor]
+ if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor]
else throw new IllegalArgumentException("TypedActor [" + component.target.getName + "] must be a subclass of TypedActor")
- if (!component.target.isInstanceOf[TypedActor]) throw new IllegalArgumentException(
- "TypedActor [" + component.target + "] must be a subclass of TypedActor")
- component.target.getConstructor(Array[Class[_]](): _*).setAccessible(true)
- val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired,
- component.lifeCycle.restartCallbacks,
- component.lifeCycle.shutdownCallback))
+
+ val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired))
+
if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get
+
val remoteAddress =
if (component.remoteAddress.isDefined)
Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
+
val proxy = TypedActor.newInstance(
targetClass, targetInstance, actorRef, remoteAddress, component.timeout).asInstanceOf[AnyRef]
+
remoteAddress.foreach(address => RemoteServer.registerTypedActor(address, targetClass.getName, proxy))
supervised ::= Supervise(actorRef, component.lifeCycle)
+
activeObjectRegistry.put(targetClass, (proxy, targetInstance, component))
new DependencyBinding(targetClass, proxy)
}
diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/BarImpl.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/BarImpl.java
index 09b50a7347..9cb41a85cf 100644
--- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/BarImpl.java
+++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/BarImpl.java
@@ -1,13 +1,16 @@
package se.scalablesolutions.akka.actor;
import com.google.inject.Inject;
+import se.scalablesolutions.akka.actor.*;
-public class BarImpl implements Bar {
+public class BarImpl extends TypedActor implements Bar {
@Inject
private Ext ext;
+
public Ext getExt() {
return ext;
}
+
public void bar(String msg) {
}
}
diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/Foo.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/Foo.java
index 87eb809aba..4cc5b977dc 100644
--- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/Foo.java
+++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/Foo.java
@@ -1,34 +1,14 @@
package se.scalablesolutions.akka.actor;
-import com.google.inject.Inject;
-
-public class Foo extends se.scalablesolutions.akka.serialization.Serializable.JavaJSON {
- @Inject
- private Bar bar;
- public Foo body() { return this; }
- public Bar getBar() {
- return bar;
- }
- public String foo(String msg) {
- return msg + "return_foo ";
- }
- public void bar(String msg) {
- bar.bar(msg);
- }
- public String longRunning() {
- try {
- Thread.sleep(1200);
- } catch (InterruptedException e) {
- }
- return "test";
- }
- public String throwsException() {
- if (true) throw new RuntimeException("Expected exception; to test fault-tolerance");
- return "test";
- }
+public interface Foo {
+ public Foo body();
+ public Bar getBar();
- public int $tag() throws java.rmi.RemoteException
- {
- return 0;
- }
+ public String foo(String msg);
+ public void bar(String msg);
+
+ public String longRunning();
+ public String throwsException();
+
+ public int $tag() throws java.rmi.RemoteException;
}
diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/FooImpl.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/FooImpl.java
new file mode 100644
index 0000000000..dc6aba481c
--- /dev/null
+++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/FooImpl.java
@@ -0,0 +1,40 @@
+package se.scalablesolutions.akka.actor;
+
+import com.google.inject.Inject;
+import se.scalablesolutions.akka.actor.*;
+
+public class FooImpl extends TypedActor implements Foo {
+ @Inject
+ private Bar bar;
+
+ public Foo body() { return this; }
+
+ public Bar getBar() {
+ return bar;
+ }
+
+ public String foo(String msg) {
+ return msg + "return_foo ";
+ }
+
+ public void bar(String msg) {
+ bar.bar(msg);
+ }
+
+ public String longRunning() {
+ try {
+ Thread.sleep(1200);
+ } catch (InterruptedException e) {
+ }
+ return "test";
+ }
+
+ public String throwsException() {
+ if (true) throw new RuntimeException("Expected exception; to test fault-tolerance");
+ return "test";
+ }
+
+ public int $tag() throws java.rmi.RemoteException {
+ return 0;
+ }
+}
diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java
index 715953152e..2e3da4f038 100644
--- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java
+++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java
@@ -3,9 +3,6 @@ package se.scalablesolutions.akka.actor;
import java.util.concurrent.CountDownLatch;
public interface SamplePojo {
- public boolean pre();
- public boolean post();
- public boolean down();
public CountDownLatch newCountdownLatch(int count);
public String greet(String s);
public String fail();
diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojoImpl.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojoImpl.java
index da241484e2..b46a9f8fa4 100644
--- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojoImpl.java
+++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojoImpl.java
@@ -8,10 +8,15 @@ public class SamplePojoImpl extends TypedActor implements SamplePojo {
private CountDownLatch latch;
- public boolean _pre = false;
- public boolean _post = false;
- public boolean _down = false;
-
+ public static boolean _pre = false;
+ public static boolean _post = false;
+ public static boolean _down = false;
+ public static void reset() {
+ _pre = false;
+ _post = false;
+ _down = false;
+ }
+
public SamplePojoImpl() {
latch = new CountDownLatch(1);
}
@@ -21,18 +26,6 @@ public class SamplePojoImpl extends TypedActor implements SamplePojo {
return latch;
}
- public boolean pre() {
- return _pre;
- }
-
- public boolean post() {
- return _post;
- }
-
- public boolean down() {
- return _down;
- }
-
public String greet(String s) {
return "hello " + s;
}
diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojo.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojo.java
index 340afe6f65..d7ab60b676 100644
--- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojo.java
+++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojo.java
@@ -8,6 +8,4 @@ public interface SimpleJavaPojo {
public void setName(String name);
public String getName();
public void throwException();
- public boolean pre();
- public boolean post();
}
diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojoImpl.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojoImpl.java
index 0e2e72ff06..3b3508c0ab 100644
--- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojoImpl.java
+++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SimpleJavaPojoImpl.java
@@ -5,19 +5,17 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture;
public class SimpleJavaPojoImpl extends TypedActor implements SimpleJavaPojo {
- public boolean pre = false;
- public boolean post = false;
+ public static boolean _pre = false;
+ public static boolean _post = false;
+ public static boolean _down = false;
+ public static void reset() {
+ _pre = false;
+ _post = false;
+ _down = false;
+ }
private String name;
- public boolean pre() {
- return pre;
- }
-
- public boolean post() {
- return post;
- }
-
public Object getSender() {
return getContext().getSender();
}
@@ -36,14 +34,12 @@ public class SimpleJavaPojoImpl extends TypedActor implements SimpleJavaPojo {
@Override
public void preRestart(Throwable e) {
- System.out.println("** pre()");
- pre = true;
+ _pre = true;
}
@Override
public void postRestart(Throwable e) {
- System.out.println("** post()");
- post = true;
+ _post = true;
}
public void throwException() {
diff --git a/akka-core/src/test/scala/NestedTransactionalActiveObjectSpec.scala b/akka-core/src/test/scala/NestedTransactionalTypedActorSpec.scala
similarity index 100%
rename from akka-core/src/test/scala/NestedTransactionalActiveObjectSpec.scala
rename to akka-core/src/test/scala/NestedTransactionalTypedActorSpec.scala
diff --git a/akka-core/src/test/scala/RemoteTransactionalActiveObjectSpec.scala b/akka-core/src/test/scala/RemoteTransactionalTypedActorSpec.scala
similarity index 100%
rename from akka-core/src/test/scala/RemoteTransactionalActiveObjectSpec.scala
rename to akka-core/src/test/scala/RemoteTransactionalTypedActorSpec.scala
diff --git a/akka-core/src/test/scala/RestartNestedTransactionalActiveObjectSpec.scala b/akka-core/src/test/scala/RestartNestedTransactionalTypedActorSpec.scala
similarity index 99%
rename from akka-core/src/test/scala/RestartNestedTransactionalActiveObjectSpec.scala
rename to akka-core/src/test/scala/RestartNestedTransactionalTypedActorSpec.scala
index 7e23b00957..d853230371 100644
--- a/akka-core/src/test/scala/RestartNestedTransactionalActiveObjectSpec.scala
+++ b/akka-core/src/test/scala/RestartNestedTransactionalTypedActorSpec.scala
@@ -27,6 +27,7 @@ class RestartNestedTransactionalTypedActorSpec extends
private var messageLog = ""
override def beforeAll {
+ /*
Config.config
conf.configure(
new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray),
@@ -41,11 +42,14 @@ class RestartNestedTransactionalTypedActorSpec extends
new LifeCycle(new Permanent),
10000)
).toArray).supervise
+ */
}
override def afterAll {
+ /*
conf.stop
ActorRegistry.shutdownAll
+ */
}
describe("Restart nested supervised transactional Active Object") {
diff --git a/akka-core/src/test/scala/RestartTransactionalActiveObjectSpec.scala b/akka-core/src/test/scala/RestartTransactionalTypedActorSpec.scala
similarity index 100%
rename from akka-core/src/test/scala/RestartTransactionalActiveObjectSpec.scala
rename to akka-core/src/test/scala/RestartTransactionalTypedActorSpec.scala
diff --git a/akka-core/src/test/scala/TransactionalActiveObjectSpec.scala b/akka-core/src/test/scala/TransactionalTypedActorSpec.scala
similarity index 100%
rename from akka-core/src/test/scala/TransactionalActiveObjectSpec.scala
rename to akka-core/src/test/scala/TransactionalTypedActorSpec.scala
diff --git a/akka-core/src/test/scala/TypedActorGuiceConfiguratorSpec.scala b/akka-core/src/test/scala/TypedActorGuiceConfiguratorSpec.scala
index b43609a4f5..d076ec52cf 100644
--- a/akka-core/src/test/scala/TypedActorGuiceConfiguratorSpec.scala
+++ b/akka-core/src/test/scala/TypedActorGuiceConfiguratorSpec.scala
@@ -40,6 +40,7 @@ class TypedActorGuiceConfiguratorSpec extends
List(
new Component(
classOf[Foo],
+ classOf[FooImpl],
new LifeCycle(new Permanent),
1000,
dispatcher),
diff --git a/akka-core/src/test/scala/TypedActorLifecycleSpec.scala b/akka-core/src/test/scala/TypedActorLifecycleSpec.scala
index bf5be69310..f17f50e9a5 100644
--- a/akka-core/src/test/scala/TypedActorLifecycleSpec.scala
+++ b/akka-core/src/test/scala/TypedActorLifecycleSpec.scala
@@ -15,8 +15,8 @@ import se.scalablesolutions.akka.config.JavaConfig._
*/
@RunWith(classOf[JUnitRunner])
class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAfterAll {
- var conf1: TypedActorConfigurator = _
- var conf2: TypedActorConfigurator = _
+// var conf1: TypedActorConfigurator = _
+// var conf2: TypedActorConfigurator = _
var conf3: TypedActorConfigurator = _
var conf4: TypedActorConfigurator = _
@@ -24,8 +24,8 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
val strategy = new RestartStrategy(new AllForOne(), 3, 1000, Array(classOf[Exception]))
// val comp1 = new Component(classOf[SamplePojoAnnotated], classOf[SamplePojoAnnotatedImpl], new LifeCycle(new Permanent()), 1000)
// val comp2 = new Component(classOf[SamplePojoAnnotated], classOf[SamplePojoAnnotatedImpl], new LifeCycle(new Temporary()), 1000)
- val comp3 = new Component(classOf[SamplePojo], classOf[SamplePojoImpl], new LifeCycle(new Permanent(), new RestartCallbacks("pre", "post")), 1000)
- val comp4 = new Component(classOf[SamplePojo], classOf[SamplePojoImpl], new LifeCycle(new Temporary(), new ShutdownCallback("down")), 1000)
+ val comp3 = new Component(classOf[SamplePojo], classOf[SamplePojoImpl], new LifeCycle(new Permanent()), 1000)
+ val comp4 = new Component(classOf[SamplePojo], classOf[SamplePojoImpl], new LifeCycle(new Temporary()), 1000)
// conf1 = new TypedActorConfigurator().configure(strategy, Array(comp1)).supervise
// conf2 = new TypedActorConfigurator().configure(strategy, Array(comp2)).supervise
conf3 = new TypedActorConfigurator().configure(strategy, Array(comp3)).supervise
@@ -40,44 +40,8 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
}
describe("TypedActor lifecycle management") {
- /*
- it("should restart supervised, annotated typed actor on failure") {
- val obj = conf1.getInstance[SamplePojoAnnotated](classOf[SamplePojoAnnotated])
- val cdl = obj.newCountdownLatch(2)
- assert(AspectInitRegistry.initFor(obj) ne null)
- try {
- obj.fail
- fail("expected exception not thrown")
- } catch {
- case e: RuntimeException => {
- cdl.await
- assert(obj.pre)
- assert(obj.post)
- assert(!obj.down)
- assert(AspectInitRegistry.initFor(obj) ne null)
- }
- }
- }
-
- it("should shutdown supervised, annotated typed actor on failure") {
- val obj = conf2.getInstance[SamplePojoAnnotated](classOf[SamplePojoAnnotated])
- val cdl = obj.newCountdownLatch(1)
- assert(AspectInitRegistry.initFor(obj) ne null)
- try {
- obj.fail
- fail("expected exception not thrown")
- } catch {
- case e: RuntimeException => {
- cdl.await
- assert(!obj.pre)
- assert(!obj.post)
- assert(obj.down)
- assert(AspectInitRegistry.initFor(obj) eq null)
- }
- }
- }
-*/
it("should restart supervised, non-annotated typed actor on failure") {
+ SamplePojoImpl.reset
val obj = conf3.getInstance[SamplePojo](classOf[SamplePojo])
val cdl = obj.newCountdownLatch(2)
assert(AspectInitRegistry.initFor(obj) ne null)
@@ -87,15 +51,16 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
} catch {
case e: RuntimeException => {
cdl.await
- assert(obj.pre)
- assert(obj.post)
- assert(!obj.down)
+ assert(SamplePojoImpl._pre)
+ assert(SamplePojoImpl._post)
+ assert(!SamplePojoImpl._down)
assert(AspectInitRegistry.initFor(obj) ne null)
}
}
}
it("should shutdown supervised, non-annotated typed actor on failure") {
+ SamplePojoImpl.reset
val obj = conf4.getInstance[SamplePojo](classOf[SamplePojo])
val cdl = obj.newCountdownLatch(1)
assert(AspectInitRegistry.initFor(obj) ne null)
@@ -105,65 +70,104 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
} catch {
case e: RuntimeException => {
cdl.await
- assert(!obj.pre)
- assert(!obj.post)
- assert(obj.down)
+ assert(!SamplePojoImpl._pre)
+ assert(!SamplePojoImpl._post)
+ assert(SamplePojoImpl._down)
assert(AspectInitRegistry.initFor(obj) eq null)
}
}
}
-/*
- it("should shutdown non-supervised, annotated typed actor on TypedActor.stop") {
- val obj = TypedActor.newInstance(classOf[SamplePojoAnnotated])
- assert(AspectInitRegistry.initFor(obj) ne null)
- assert("hello akka" === obj.greet("akka"))
- TypedActor.stop(obj)
- assert(AspectInitRegistry.initFor(obj) eq null)
- assert(!obj.pre)
- assert(!obj.post)
- assert(obj.down)
- try {
- obj.greet("akka")
- fail("access to stopped typed actor")
- } catch {
- case e: Exception => {}
- }
- }
-
- it("should shutdown non-supervised, annotated typed actor on ActorRegistry.shutdownAll") {
- val obj = TypedActor.newInstance(classOf[SamplePojoAnnotated])
- assert(AspectInitRegistry.initFor(obj) ne null)
- assert("hello akka" === obj.greet("akka"))
- ActorRegistry.shutdownAll
- assert(AspectInitRegistry.initFor(obj) eq null)
- assert(!obj.pre)
- assert(!obj.post)
- assert(obj.down)
- try {
- obj.greet("akka")
- fail("access to stopped typed actor")
- } catch {
- case e: Exception => { /* test passed */ }
- }
- }
- */
it("should shutdown non-supervised, non-initialized typed actor on TypedActor.stop") {
+ SamplePojoImpl.reset
val obj = TypedActor.newInstance(classOf[SamplePojo], classOf[SamplePojoImpl])
TypedActor.stop(obj)
- assert(!obj.pre)
- assert(!obj.post)
- assert(obj.down)
+ assert(!SamplePojoImpl._pre)
+ assert(!SamplePojoImpl._post)
+ assert(SamplePojoImpl._down)
}
it("both preRestart and postRestart methods should be invoked when an actor is restarted") {
+ SamplePojoImpl.reset
val pojo = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl])
val supervisor = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl])
- link(supervisor,pojo, new OneForOneStrategy(3, 2000),Array(classOf[Throwable]))
+ link(supervisor, pojo, new OneForOneStrategy(3, 2000), Array(classOf[Throwable]))
pojo.throwException
Thread.sleep(500)
- pojo.pre should be(true)
- pojo.post should be(true)
+ SimpleJavaPojoImpl._pre should be(true)
+ SimpleJavaPojoImpl._post should be(true)
}
+
+ /*
+ it("should shutdown non-supervised, annotated typed actor on TypedActor.stop") {
+ val obj = TypedActor.newInstance(classOf[SamplePojoAnnotated])
+ assert(AspectInitRegistry.initFor(obj) ne null)
+ assert("hello akka" === obj.greet("akka"))
+ TypedActor.stop(obj)
+ assert(AspectInitRegistry.initFor(obj) eq null)
+ assert(!obj.pre)
+ assert(!obj.post)
+ assert(obj.down)
+ try {
+ obj.greet("akka")
+ fail("access to stopped typed actor")
+ } catch {
+ case e: Exception => {}
+ }
+ }
+
+ it("should shutdown non-supervised, annotated typed actor on ActorRegistry.shutdownAll") {
+ val obj = TypedActor.newInstance(classOf[SamplePojoAnnotated])
+ assert(AspectInitRegistry.initFor(obj) ne null)
+ assert("hello akka" === obj.greet("akka"))
+ ActorRegistry.shutdownAll
+ assert(AspectInitRegistry.initFor(obj) eq null)
+ assert(!obj.pre)
+ assert(!obj.post)
+ assert(obj.down)
+ try {
+ obj.greet("akka")
+ fail("access to stopped typed actor")
+ } catch {
+ case e: Exception => { }
+ }
+ }
+
+ it("should restart supervised, annotated typed actor on failure") {
+ val obj = conf1.getInstance[SamplePojoAnnotated](classOf[SamplePojoAnnotated])
+ val cdl = obj.newCountdownLatch(2)
+ assert(AspectInitRegistry.initFor(obj) ne null)
+ try {
+ obj.fail
+ fail("expected exception not thrown")
+ } catch {
+ case e: RuntimeException => {
+ cdl.await
+ assert(obj.pre)
+ assert(obj.post)
+ assert(!obj.down)
+ assert(AspectInitRegistry.initFor(obj) ne null)
+ }
+ }
+ }
+
+ it("should shutdown supervised, annotated typed actor on failure") {
+ val obj = conf2.getInstance[SamplePojoAnnotated](classOf[SamplePojoAnnotated])
+ val cdl = obj.newCountdownLatch(1)
+ assert(AspectInitRegistry.initFor(obj) ne null)
+ try {
+ obj.fail
+ fail("expected exception not thrown")
+ } catch {
+ case e: RuntimeException => {
+ cdl.await
+ assert(!obj.pre)
+ assert(!obj.post)
+ assert(obj.down)
+ assert(AspectInitRegistry.initFor(obj) eq null)
+ }
+ }
+ }
+ */
}
}
\ No newline at end of file
diff --git a/akka-spring/akka-spring-test-java/src/test/java/se/scalablesolutions/akka/spring/ActiveObjectConfigurationTest.java b/akka-spring/akka-spring-test-java/src/test/java/se/scalablesolutions/akka/spring/TypedActorConfigurationTest.java
similarity index 100%
rename from akka-spring/akka-spring-test-java/src/test/java/se/scalablesolutions/akka/spring/ActiveObjectConfigurationTest.java
rename to akka-spring/akka-spring-test-java/src/test/java/se/scalablesolutions/akka/spring/TypedActorConfigurationTest.java
diff --git a/akka-spring/src/main/scala/AkkaBeansException.scala b/akka-spring/src/main/scala/AkkaBeansException.scala
deleted file mode 100644
index 8cbffa86f7..0000000000
--- a/akka-spring/src/main/scala/AkkaBeansException.scala
+++ /dev/null
@@ -1,14 +0,0 @@
-package se.scalablesolutions.akka.spring
-
-import org.springframework.beans.BeansException
-
-/**
-* Exception to use when something goes wrong during bean creation
-@author Johan Rask
-*/
-class AkkaBeansException(errorMsg:String,t:Throwable) extends BeansException(errorMsg,t) {
-
- def this(errorMsg:String) = {
- this(errorMsg,null)
- }
-}
diff --git a/akka-spring/src/main/scala/StringReflect.scala b/akka-spring/src/main/scala/StringReflect.scala
index 7dda9dba08..ae32350e2d 100644
--- a/akka-spring/src/main/scala/StringReflect.scala
+++ b/akka-spring/src/main/scala/StringReflect.scala
@@ -1,9 +1,11 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB