diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala index 383c6d9545..8aa34a3187 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala @@ -21,8 +21,8 @@ import akka.serialization._ * @author Jonas Bonér */ sealed trait ActorRegistryEvent -case class ActorRegistered(address: String, actor: ActorRef) extends ActorRegistryEvent -case class ActorUnregistered(address: String, actor: ActorRef) extends ActorRegistryEvent +case class ActorRegistered(address: String, actor: ActorRef, typedActor: Option[AnyRef]) extends ActorRegistryEvent +case class ActorUnregistered(address: String, actor: ActorRef, typedActor: Option[AnyRef]) extends ActorRegistryEvent /** * Registry holding all Actor instances in the whole system. @@ -66,11 +66,12 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag actorsByAddress.put(address, actor) actorsByUuid.put(actor.uuid, actor) - notifyListeners(ActorRegistered(address, actor)) + notifyListeners(ActorRegistered(address, actor, Option(typedActorsByUuid get actor.uuid))) } private[akka] def registerTypedActor(actorRef: ActorRef, interface: AnyRef) { typedActorsByUuid.put(actorRef.uuid, interface) + actorRef.start // register actorRef } /** @@ -79,7 +80,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag private[akka] def unregister(address: String) { val actor = actorsByAddress remove address actorsByUuid remove actor.uuid - notifyListeners(ActorUnregistered(address, actor)) + notifyListeners(ActorUnregistered(address, actor, None)) } /** @@ -89,8 +90,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag val address = actor.address actorsByAddress remove address actorsByUuid remove actor.uuid - typedActorsByUuid remove actor.uuid - notifyListeners(ActorUnregistered(address, actor)) + notifyListeners(ActorUnregistered(address, actor, Option(typedActorsByUuid remove actor.uuid))) } /** diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 78f234c91a..6de5b85543 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -36,6 +36,7 @@ object TypedActor { } case class TypedActorInvocationHandler(actor: ActorRef) extends InvocationHandler { + val impl = actor.actor.asInstanceOf[TypedActor[_, _]].me def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = method.getName match { case "toString" ⇒ actor.toString case "equals" ⇒ (args.length == 1 && (proxy eq args(0)) || actor == getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean @@ -157,7 +158,7 @@ object TypedActor { val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(ref)).asInstanceOf[T] proxyRef.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive - Actor.registry.registerTypedActor(ref.start, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop + Actor.registry.registerTypedActor(ref, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop proxy } diff --git a/akka-camel-typed/src/main/scala/akka/camel/TypedConsumer.scala b/akka-camel-typed/src/main/scala/akka/camel/TypedConsumer.scala index c315742f3a..eae0dbaf36 100644 --- a/akka-camel-typed/src/main/scala/akka/camel/TypedConsumer.scala +++ b/akka-camel-typed/src/main/scala/akka/camel/TypedConsumer.scala @@ -5,6 +5,7 @@ package akka.camel import java.lang.reflect.Method +import java.lang.reflect.Proxy._ import akka.actor.{ TypedActor, ActorRef } @@ -12,6 +13,7 @@ import akka.actor.{ TypedActor, ActorRef } * @author Martin Krasser */ private[camel] object TypedConsumer { + /** * Applies a function f to actorRef if actorRef * references a typed consumer actor. A valid reference to a typed consumer actor is a @@ -21,18 +23,35 @@ private[camel] object TypedConsumer { * is called with the corresponding method instance and the return value is * added to a list which is then returned by this method. */ - def withTypedConsumer[T](actorRef: ActorRef)(f: Method ⇒ T): List[T] = { - if (!actorRef.actor.isInstanceOf[TypedActor]) Nil - else if (actorRef.homeAddress.isDefined) Nil - else { - val typedActor = actorRef.actor.asInstanceOf[TypedActor] - // TODO: support consumer annotation inheritance - // - visit overridden methods in superclasses - // - visit implemented method declarations in interfaces - val intfClass = typedActor.proxy.getClass - val implClass = typedActor.getClass - (for (m ← intfClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume]))) yield f(m)) ++ - (for (m ← implClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume]))) yield f(m)) + def withTypedConsumer[T](actorRef: ActorRef, typedActor: Option[AnyRef])(f: (AnyRef, Method) ⇒ T): List[T] = { + typedActor match { + case None ⇒ Nil + case Some(tc) ⇒ { + withConsumeAnnotatedMethodsOnInterfaces(tc, f) ++ + withConsumeAnnotatedMethodsonImplClass(tc, f) + } + } + } + + private implicit def class2ProxyClass(c: Class[_]) = new ProxyClass(c) + + private def withConsumeAnnotatedMethodsOnInterfaces[T](tc: AnyRef, f: (AnyRef, Method) ⇒ T): List[T] = for { + i ← tc.getClass.allInterfaces + m ← i.getDeclaredMethods.toList + if (m.isAnnotationPresent(classOf[consume])) + } yield f(tc, m) + + private def withConsumeAnnotatedMethodsonImplClass[T](tc: AnyRef, f: (AnyRef, Method) ⇒ T): List[T] = { + val implClass = getInvocationHandler(tc).asInstanceOf[TypedActor.TypedActorInvocationHandler].impl.asInstanceOf[AnyRef].getClass + for (m ← implClass.getDeclaredMethods.toList; if (m.isAnnotationPresent(classOf[consume]))) yield f(tc, m) + + } + + private class ProxyClass(c: Class[_]) { + def allInterfaces: List[Class[_]] = allInterfaces(c.getInterfaces.toList) + def allInterfaces(is: List[Class[_]]): List[Class[_]] = is match { + case Nil ⇒ Nil + case x :: xs ⇒ x :: allInterfaces(x.getInterfaces.toList) ::: allInterfaces(xs) } } } diff --git a/akka-camel-typed/src/main/scala/akka/camel/TypedConsumerPublisher.scala b/akka-camel-typed/src/main/scala/akka/camel/TypedConsumerPublisher.scala index 883b98afce..fae8426cbe 100644 --- a/akka-camel-typed/src/main/scala/akka/camel/TypedConsumerPublisher.scala +++ b/akka-camel-typed/src/main/scala/akka/camel/TypedConsumerPublisher.scala @@ -7,8 +7,8 @@ package akka.camel import java.lang.reflect.Method import akka.actor._ -import akka.event.EventHandler import akka.camel.component.TypedActorComponent +import akka.event.EventHandler /** * Concrete publish requestor that requests publication of typed consumer actor methods on @@ -19,8 +19,8 @@ import akka.camel.component.TypedActorComponent */ private[camel] class TypedConsumerPublishRequestor extends PublishRequestor { def receiveActorRegistryEvent = { - case ActorRegistered(actor) ⇒ for (event ← ConsumerMethodRegistered.eventsFor(actor)) deliverCurrentEvent(event) - case ActorUnregistered(actor) ⇒ for (event ← ConsumerMethodUnregistered.eventsFor(actor)) deliverCurrentEvent(event) + case ActorRegistered(_, actor, typedActor) ⇒ for (event ← ConsumerMethodRegistered.eventsFor(actor, typedActor)) deliverCurrentEvent(event) + case ActorUnregistered(_, actor, typedActor) ⇒ for (event ← ConsumerMethodUnregistered.eventsFor(actor, typedActor)) deliverCurrentEvent(event) } } @@ -84,12 +84,12 @@ private[camel] class ConsumerMethodRouteBuilder(event: ConsumerMethodRegistered) */ private[camel] trait ConsumerMethodEvent extends ConsumerEvent { val actorRef: ActorRef + val typedActor: AnyRef val method: Method val uuid = actorRef.uuid.toString val methodName = method.getName val methodUuid = "%s_%s" format (uuid, methodName) - val typedActor = actorRef.actor.asInstanceOf[TypedActor].proxy lazy val routeDefinitionHandler = consumeAnnotation.routeDefinitionHandler.newInstance lazy val consumeAnnotation = method.getAnnotation(classOf[consume]) @@ -100,13 +100,13 @@ private[camel] trait ConsumerMethodEvent extends ConsumerEvent { * Event indicating that a typed consumer actor has been registered at the actor registry. For * each @consume annotated typed actor method a separate event is created. */ -private[camel] case class ConsumerMethodRegistered(actorRef: ActorRef, method: Method) extends ConsumerMethodEvent +private[camel] case class ConsumerMethodRegistered(actorRef: ActorRef, typedActor: AnyRef, method: Method) extends ConsumerMethodEvent /** * Event indicating that a typed consumer actor has been unregistered from the actor registry. For * each @consume annotated typed actor method a separate event is created. */ -private[camel] case class ConsumerMethodUnregistered(actorRef: ActorRef, method: Method) extends ConsumerMethodEvent +private[camel] case class ConsumerMethodUnregistered(actorRef: ActorRef, typedActor: AnyRef, method: Method) extends ConsumerMethodEvent /** * @author Martin Krasser @@ -116,9 +116,9 @@ private[camel] object ConsumerMethodRegistered { * Creates a list of ConsumerMethodRegistered event messages for a typed consumer actor or an empty * list if actorRef doesn't reference a typed consumer actor. */ - def eventsFor(actorRef: ActorRef): List[ConsumerMethodRegistered] = { - TypedConsumer.withTypedConsumer(actorRef: ActorRef) { m ⇒ - ConsumerMethodRegistered(actorRef, m) + def eventsFor(actorRef: ActorRef, typedActor: Option[AnyRef]): List[ConsumerMethodRegistered] = { + TypedConsumer.withTypedConsumer(actorRef, typedActor) { (tc, m) ⇒ + ConsumerMethodRegistered(actorRef, tc, m) } } } @@ -131,9 +131,9 @@ private[camel] object ConsumerMethodUnregistered { * Creates a list of ConsumerMethodUnregistered event messages for a typed consumer actor or an empty * list if actorRef doesn't reference a typed consumer actor. */ - def eventsFor(actorRef: ActorRef): List[ConsumerMethodUnregistered] = { - TypedConsumer.withTypedConsumer(actorRef) { m ⇒ - ConsumerMethodUnregistered(actorRef, m) + def eventsFor(actorRef: ActorRef, typedActor: Option[AnyRef]): List[ConsumerMethodUnregistered] = { + TypedConsumer.withTypedConsumer(actorRef, typedActor) { (tc, m) ⇒ + ConsumerMethodUnregistered(actorRef, tc, m) } } } diff --git a/akka-camel-typed/src/main/scala/akka/camel/component/TypedActorComponent.scala b/akka-camel-typed/src/main/scala/akka/camel/component/TypedActorComponent.scala index 5110867fa7..36d6c50516 100644 --- a/akka-camel-typed/src/main/scala/akka/camel/component/TypedActorComponent.scala +++ b/akka-camel-typed/src/main/scala/akka/camel/component/TypedActorComponent.scala @@ -65,10 +65,10 @@ class TypedActorHolder(uri: String, context: CamelContext, name: String) extends RegistryBean(context, name) { /** - * Returns an akka.camel.component.TypedActorInfo instance. + * Returns an akka.camel.component.BeanInfo instance. */ override def getBeanInfo: BeanInfo = - new TypedActorInfo(getContext, getBean.getClass, getParameterMappingStrategy) + new BeanInfo(getContext, getBean.getClass, getParameterMappingStrategy) /** * Obtains a typed actor from Actor.registry if the schema is @@ -80,39 +80,6 @@ class TypedActorHolder(uri: String, context: CamelContext, name: String) */ override def getBean: AnyRef = { val internal = uri.startsWith(TypedActorComponent.InternalSchema) - if (internal) Actor.registry.typedActorFor(uuidFrom(getName)) getOrElse null else super.getBean - } -} - -/** - * Typed actor meta information. - * - * @author Martin Krasser - */ -class TypedActorInfo(context: CamelContext, clazz: Class[_], strategy: ParameterMappingStrategy) - extends BeanInfo(context, clazz, strategy) { - - /** - * Introspects AspectWerkz proxy classes. - * - * @param clazz AspectWerkz proxy class. - */ - protected override def introspect(clazz: Class[_]): Unit = { - - // TODO: fix target class detection in BeanInfo.introspect(Class) - // Camel assumes that classes containing a '$$' in the class name - // are classes generated with CGLIB. This conflicts with proxies - // created from interfaces with AspectWerkz. Once the fix is in - // place this method can be removed. - - for (method ← clazz.getDeclaredMethods) { - if (isValidMethod(clazz, method)) { - introspect(clazz, method) - } - } - val superclass = clazz.getSuperclass - if ((superclass ne null) && !superclass.equals(classOf[AnyRef])) { - introspect(superclass) - } + if (internal) Actor.registry.local.typedActorFor(uuidFrom(getName)) getOrElse null else super.getBean } } diff --git a/akka-camel-typed/src/test/java/akka/camel/SampleErrorHandlingTypedConsumerImpl.java b/akka-camel-typed/src/test/java/akka/camel/SampleErrorHandlingTypedConsumerImpl.java index cfa42a7521..89b3948b00 100644 --- a/akka-camel-typed/src/test/java/akka/camel/SampleErrorHandlingTypedConsumerImpl.java +++ b/akka-camel-typed/src/test/java/akka/camel/SampleErrorHandlingTypedConsumerImpl.java @@ -1,11 +1,9 @@ package akka.camel; -import akka.actor.TypedActor; - /** * @author Martin Krasser */ -public class SampleErrorHandlingTypedConsumerImpl extends TypedActor implements SampleErrorHandlingTypedConsumer { +public class SampleErrorHandlingTypedConsumerImpl implements SampleErrorHandlingTypedConsumer { public String willFail(String s) { throw new RuntimeException(String.format("error: %s", s)); diff --git a/akka-camel-typed/src/test/java/akka/camel/SampleRemoteTypedConsumerImpl.java b/akka-camel-typed/src/test/java/akka/camel/SampleRemoteTypedConsumerImpl.java index d7fb463b44..067fb4eda6 100644 --- a/akka-camel-typed/src/test/java/akka/camel/SampleRemoteTypedConsumerImpl.java +++ b/akka-camel-typed/src/test/java/akka/camel/SampleRemoteTypedConsumerImpl.java @@ -1,11 +1,9 @@ package akka.camel; -import akka.actor.TypedActor; - /** * @author Martin Krasser */ -public class SampleRemoteTypedConsumerImpl extends TypedActor implements SampleRemoteTypedConsumer { +public class SampleRemoteTypedConsumerImpl implements SampleRemoteTypedConsumer { public String foo(String s) { return String.format("remote typed actor: %s", s); diff --git a/akka-camel-typed/src/test/java/akka/camel/SampleTypedActorImpl.java b/akka-camel-typed/src/test/java/akka/camel/SampleTypedActorImpl.java index 773e3ec3ec..93d6cd9395 100644 --- a/akka-camel-typed/src/test/java/akka/camel/SampleTypedActorImpl.java +++ b/akka-camel-typed/src/test/java/akka/camel/SampleTypedActorImpl.java @@ -5,7 +5,7 @@ import akka.actor.TypedActor; /** * @author Martin Krasser */ -public class SampleTypedActorImpl extends TypedActor implements SampleTypedActor { +public class SampleTypedActorImpl implements SampleTypedActor { public String foo(String s) { return String.format("foo: %s", s); diff --git a/akka-camel-typed/src/test/java/akka/camel/SampleTypedConsumerImpl.java b/akka-camel-typed/src/test/java/akka/camel/SampleTypedConsumerImpl.java index 3bbe7a9442..8a402133f6 100644 --- a/akka-camel-typed/src/test/java/akka/camel/SampleTypedConsumerImpl.java +++ b/akka-camel-typed/src/test/java/akka/camel/SampleTypedConsumerImpl.java @@ -1,11 +1,9 @@ package akka.camel; -import akka.actor.TypedActor; - /** * @author Martin Krasser */ -public class SampleTypedConsumerImpl extends TypedActor implements SampleTypedConsumer { +public class SampleTypedConsumerImpl implements SampleTypedConsumer { public String m1(String b, String h) { return "m1: " + b + " " + h; diff --git a/akka-camel-typed/src/test/java/akka/camel/SampleTypedSingleConsumerImpl.java b/akka-camel-typed/src/test/java/akka/camel/SampleTypedSingleConsumerImpl.java index 27fbfdaa0d..fa4807eec4 100644 --- a/akka-camel-typed/src/test/java/akka/camel/SampleTypedSingleConsumerImpl.java +++ b/akka-camel-typed/src/test/java/akka/camel/SampleTypedSingleConsumerImpl.java @@ -1,11 +1,9 @@ package akka.camel; -import akka.actor.TypedActor; - /** * @author Martin Krasser */ -public class SampleTypedSingleConsumerImpl extends TypedActor implements SampleTypedSingleConsumer { +public class SampleTypedSingleConsumerImpl implements SampleTypedSingleConsumer { public void foo(String b) { } diff --git a/akka-camel-typed/src/test/java/akka/camel/TypedConsumerJavaTestBase.java b/akka-camel-typed/src/test/java/akka/camel/TypedConsumerJavaTestBase.java index 64e8197de8..64aa29ed54 100644 --- a/akka-camel-typed/src/test/java/akka/camel/TypedConsumerJavaTestBase.java +++ b/akka-camel-typed/src/test/java/akka/camel/TypedConsumerJavaTestBase.java @@ -1,7 +1,11 @@ package akka.camel; +import akka.actor.Actor; import akka.actor.TypedActor; +import akka.actor.TypedActor.Configuration; +import akka.dispatch.Dispatchers; import akka.japi.SideEffect; +import akka.util.FiniteDuration; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -28,16 +32,18 @@ public class TypedConsumerJavaTestBase { @AfterClass public static void tearDownAfterClass() { stopCamelService(); - registry().shutdownAll(); + registry().local().shutdownAll(); } @Test public void shouldHandleExceptionThrownByTypedActorAndGenerateCustomResponse() { getMandatoryService().awaitEndpointActivation(1, new SideEffect() { public void apply() { - consumer = TypedActor.newInstance( + consumer = TypedActor.typedActorOf( SampleErrorHandlingTypedConsumer.class, - SampleErrorHandlingTypedConsumerImpl.class); + SampleErrorHandlingTypedConsumerImpl.class, + new Configuration(new FiniteDuration(5000, "millis"), Dispatchers.defaultGlobalDispatcher() + )); } }); String result = getMandatoryTemplate().requestBody("direct:error-handler-test-java-typed", "hello", String.class); diff --git a/akka-camel-typed/src/test/scala/akka/camel/TypedConsumerPublishRequestorTest.scala b/akka-camel-typed/src/test/scala/akka/camel/TypedConsumerPublishRequestorTest.scala index 406c5656c1..cc4ff0f0cf 100644 --- a/akka-camel-typed/src/test/scala/akka/camel/TypedConsumerPublishRequestorTest.scala +++ b/akka-camel-typed/src/test/scala/akka/camel/TypedConsumerPublishRequestorTest.scala @@ -7,6 +7,7 @@ import org.scalatest.junit.JUnitSuite import akka.actor._ import akka.actor.Actor._ +import akka.actor.TypedActor.Configuration._ import akka.camel.TypedCamelTestSupport.{ SetExpectedMessageCount ⇒ SetExpectedTestMessageCount, _ } class TypedConsumerPublishRequestorTest extends JUnitSuite { @@ -33,14 +34,14 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite { @After def tearDown = { Actor.registry.removeListener(requestor); - Actor.registry.shutdownAll + Actor.registry.local.shutdownAll } @Test def shouldReceiveOneConsumerMethodRegisteredEvent = { Actor.registry.addListener(requestor) val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get - val obj = TypedActor.newInstance(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl]) + val obj = TypedActor.typedActorOf(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl], defaultConfiguration) assert(latch.await(5000, TimeUnit.MILLISECONDS)) val event = (publisher !! GetRetainedMessage).as[ConsumerMethodRegistered].get assert(event.endpointUri === "direct:foo") @@ -50,7 +51,7 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite { @Test def shouldReceiveOneConsumerMethodUnregisteredEvent = { - val obj = TypedActor.newInstance(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl]) + val obj = TypedActor.typedActorOf(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl], defaultConfiguration) val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get Actor.registry.addListener(requestor) TypedActor.stop(obj) @@ -65,7 +66,7 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite { def shouldReceiveThreeConsumerMethodRegisteredEvents = { Actor.registry.addListener(requestor) val latch = (publisher !! SetExpectedTestMessageCount(3)).as[CountDownLatch].get - val obj = TypedActor.newInstance(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl]) + val obj = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], defaultConfiguration) assert(latch.await(5000, TimeUnit.MILLISECONDS)) val request = GetRetainedMessages(_.isInstanceOf[ConsumerMethodRegistered]) val events = (publisher !! request).as[List[ConsumerMethodRegistered]].get @@ -74,7 +75,7 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite { @Test def shouldReceiveThreeConsumerMethodUnregisteredEvents = { - val obj = TypedActor.newInstance(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl]) + val obj = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], defaultConfiguration) val latch = (publisher !! SetExpectedTestMessageCount(3)).as[CountDownLatch].get Actor.registry.addListener(requestor) TypedActor.stop(obj) diff --git a/akka-camel-typed/src/test/scala/akka/camel/TypedConsumerScalaTest.scala b/akka-camel-typed/src/test/scala/akka/camel/TypedConsumerScalaTest.scala index 0cc0073e2d..1692c8e6fc 100644 --- a/akka-camel-typed/src/test/scala/akka/camel/TypedConsumerScalaTest.scala +++ b/akka-camel-typed/src/test/scala/akka/camel/TypedConsumerScalaTest.scala @@ -7,6 +7,7 @@ import org.scalatest.matchers.MustMatchers import akka.actor.Actor._ import akka.actor._ +import akka.actor.TypedActor.Configuration._ /** * @author Martin Krasser @@ -18,13 +19,13 @@ class TypedConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMa var service: CamelService = _ override protected def beforeAll = { - registry.shutdownAll + registry.local.shutdownAll service = CamelServiceManager.startCamelService } override protected def afterAll = { service.stop - registry.shutdownAll + registry.local.shutdownAll } "A responding, typed consumer" when { @@ -32,7 +33,7 @@ class TypedConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMa "started" must { "support in-out message exchanges via its endpoints" in { service.awaitEndpointActivation(3) { - actor = TypedActor.newInstance(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl]) + actor = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], defaultConfiguration) } must be(true) mandatoryTemplate.requestBodyAndHeader("direct:m2", "x", "test", "y") must equal("m2: x y") mandatoryTemplate.requestBodyAndHeader("direct:m3", "x", "test", "y") must equal("m3: x y") @@ -62,7 +63,7 @@ class TypedConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMa "started" must { "support in-out message exchanges via its endpoints" in { service.awaitEndpointActivation(2) { - actor = TypedActor.newInstance(classOf[TestTypedConsumer], classOf[TestTypedConsumerImpl]) + actor = TypedActor.typedActorOf(classOf[TestTypedConsumer], classOf[TestTypedConsumerImpl], defaultConfiguration) } must be(true) mandatoryTemplate.requestBody("direct:publish-test-3", "x") must equal("foo: x") mandatoryTemplate.requestBody("direct:publish-test-4", "x") must equal("bar: x") @@ -91,7 +92,7 @@ object TypedConsumerScalaTest { def bar(s: String): String } - class TestTypedConsumerImpl extends TypedActor with TestTypedConsumer { + class TestTypedConsumerImpl extends TestTypedConsumer { def foo(s: String) = "foo: %s" format s @consume("direct:publish-test-4") def bar(s: String) = "bar: %s" format s diff --git a/akka-camel-typed/src/test/scala/akka/camel/component/TypedActorComponentFeatureTest.scala b/akka-camel-typed/src/test/scala/akka/camel/component/TypedActorComponentFeatureTest.scala index 04d08023a3..91058e3109 100644 --- a/akka-camel-typed/src/test/scala/akka/camel/component/TypedActorComponentFeatureTest.scala +++ b/akka-camel-typed/src/test/scala/akka/camel/component/TypedActorComponentFeatureTest.scala @@ -6,8 +6,8 @@ import org.apache.camel.impl.{ DefaultCamelContext, SimpleRegistry } import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec } import akka.actor.{ Actor, TypedActor } +import akka.actor.TypedActor.Configuration._ import akka.camel._ -import akka.util.ReflectiveAccess.TypedActorModule /** * @author Martin Krasser @@ -19,10 +19,14 @@ class TypedActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll var typedConsumerUuid: String = _ override protected def beforeAll = { - val typedActor = TypedActor.newInstance(classOf[SampleTypedActor], classOf[SampleTypedActorImpl]) // not a consumer - val typedConsumer = TypedActor.newInstance(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl]) + val typedActor = TypedActor.typedActorOf( + classOf[SampleTypedActor], + classOf[SampleTypedActorImpl], defaultConfiguration) // not a consumer + val typedConsumer = TypedActor.typedActorOf( + classOf[SampleTypedConsumer], + classOf[SampleTypedConsumerImpl], defaultConfiguration) - typedConsumerUuid = TypedActorModule.typedActorObjectInstance.get.actorFor(typedConsumer).get.uuid.toString + typedConsumerUuid = TypedActor.getActorRefFor(typedConsumer).uuid.toString val registry = new SimpleRegistry // external registration @@ -35,7 +39,7 @@ class TypedActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll override protected def afterAll = { CamelContextManager.stop - Actor.registry.shutdownAll + Actor.registry.local.shutdownAll } feature("Communicate with an internally-registered typed actor using typed-actor-internal endpoint URIs") { diff --git a/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala b/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala index fb15c9d1fc..507124ba2f 100644 --- a/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala +++ b/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala @@ -20,8 +20,8 @@ import akka.event.EventHandler */ private[camel] class ConsumerPublishRequestor extends PublishRequestor { def receiveActorRegistryEvent = { - case ActorRegistered(_, actor) ⇒ for (event ← ConsumerActorRegistered.eventFor(actor)) deliverCurrentEvent(event) - case ActorUnregistered(_, actor) ⇒ for (event ← ConsumerActorUnregistered.eventFor(actor)) deliverCurrentEvent(event) + case ActorRegistered(_, actor, None) ⇒ for (event ← ConsumerActorRegistered.eventFor(actor)) deliverCurrentEvent(event) + case ActorUnregistered(_, actor, None) ⇒ for (event ← ConsumerActorUnregistered.eventFor(actor)) deliverCurrentEvent(event) } } diff --git a/akka-camel/src/main/scala/akka/camel/PublisherRequestor.scala b/akka-camel/src/main/scala/akka/camel/PublisherRequestor.scala index 7083cdbe6e..7c1ace2b77 100644 --- a/akka-camel/src/main/scala/akka/camel/PublisherRequestor.scala +++ b/akka-camel/src/main/scala/akka/camel/PublisherRequestor.scala @@ -54,7 +54,7 @@ private[camel] abstract class PublishRequestor extends Actor { * @author Martin Krasser */ private[camel] object PublishRequestor { - def pastActorRegisteredEvents = for (actor ← Actor.registry.local.actors) yield ActorRegistered(actor.address, actor) + def pastActorRegisteredEvents = for (actor ← Actor.registry.local.actors) yield ActorRegistered(actor.address, actor, None) } /** diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala index 4de98f335f..2cf4e3400f 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala @@ -36,7 +36,7 @@ class ConsumerPublishRequestorTest extends JUnitSuite { @Test def shouldReceiveOneConsumerRegisteredEvent = { val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get - requestor ! ActorRegistered(consumer.address, consumer) + requestor ! ActorRegistered(consumer.address, consumer, None) assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert((publisher !! GetRetainedMessage) === Some(ConsumerActorRegistered(consumer, consumer.actor.asInstanceOf[Consumer]))) @@ -45,7 +45,7 @@ class ConsumerPublishRequestorTest extends JUnitSuite { @Test def shouldReceiveOneConsumerUnregisteredEvent = { val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get - requestor ! ActorUnregistered(consumer.address, consumer) + requestor ! ActorUnregistered(consumer.address, consumer, None) assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert((publisher !! GetRetainedMessage) === Some(ConsumerActorUnregistered(consumer, consumer.actor.asInstanceOf[Consumer]))) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 7f3b8de710..7e4fb8e405 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -75,11 +75,13 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec lazy val jsr311ModuleConfig = ModuleConfiguration("javax.ws.rs", "jsr311-api", sbt.DefaultMavenRepository) lazy val zookeeperModuleConfig = ModuleConfiguration("org.apache.hadoop.zookeeper", AkkaRepo) lazy val zkclientModuleConfig = ModuleConfiguration("zkclient", AkkaRepo) + lazy val camelModuleConfig = ModuleConfiguration("org.apache.camel", "camel-core", AkkaRepo) // ------------------------------------------------------------------------------------------------------------------- // Versions // ------------------------------------------------------------------------------------------------------------------- lazy val CAMEL_VERSION = "2.7.1" + lazy val CAMEL_PATCH_VERSION = "2.7.1.1" lazy val SPRING_VERSION = "3.0.5.RELEASE" lazy val JACKSON_VERSION = "1.8.0" lazy val JERSEY_VERSION = "1.3" @@ -103,7 +105,7 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec lazy val aspectwerkz = "org.codehaus.aspectwerkz" % "aspectwerkz" % "2.2.3" % "compile" //ApacheV2 lazy val beanstalk = "beanstalk" % "beanstalk_client" % "1.4.5" //New BSD lazy val bookkeeper = "org.apache.hadoop.zookeeper" % "bookkeeper" % ZOOKEEPER_VERSION //ApacheV2 - lazy val camel_core = "org.apache.camel" % "camel-core" % CAMEL_VERSION % "compile" //ApacheV2 + lazy val camel_core = "org.apache.camel" % "camel-core" % CAMEL_PATCH_VERSION % "compile" //ApacheV2 lazy val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" //ApacheV2 lazy val commons_io = "commons-io" % "commons-io" % "2.0.1" % "compile" //ApacheV2 @@ -170,7 +172,7 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec lazy val akka_durable_mailboxes = project("akka-durable-mailboxes", "akka-durable-mailboxes", new AkkaDurableMailboxesParentProject(_), akka_remote) lazy val akka_camel = project("akka-camel", "akka-camel", new AkkaCamelProject(_), akka_actor, akka_slf4j) - //lazy val akka_camel_typed = project("akka-camel-typed", "akka-camel-typed", new AkkaCamelTypedProject(_), akka_actor, akka_slf4j, akka_camel) + lazy val akka_camel_typed = project("akka-camel-typed", "akka-camel-typed", new AkkaCamelTypedProject(_), akka_actor, akka_slf4j, akka_camel) //lazy val akka_spring = project("akka-spring", "akka-spring", new AkkaSpringProject(_), akka_remote, akka_actor, akka_camel) lazy val akka_kernel = project("akka-kernel", "akka-kernel", new AkkaKernelProject(_), akka_stm, akka_remote, akka_http, akka_slf4j, akka_camel)