Migrate akka-camel-typed to new typed actor implementation.
This commit is contained in:
parent
3d7a717b06
commit
efd9a895e8
18 changed files with 98 additions and 105 deletions
|
|
@ -21,8 +21,8 @@ import akka.serialization._
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
sealed trait ActorRegistryEvent
|
||||
case class ActorRegistered(address: String, actor: ActorRef) extends ActorRegistryEvent
|
||||
case class ActorUnregistered(address: String, actor: ActorRef) extends ActorRegistryEvent
|
||||
case class ActorRegistered(address: String, actor: ActorRef, typedActor: Option[AnyRef]) extends ActorRegistryEvent
|
||||
case class ActorUnregistered(address: String, actor: ActorRef, typedActor: Option[AnyRef]) extends ActorRegistryEvent
|
||||
|
||||
/**
|
||||
* Registry holding all Actor instances in the whole system.
|
||||
|
|
@ -66,11 +66,12 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag
|
|||
|
||||
actorsByAddress.put(address, actor)
|
||||
actorsByUuid.put(actor.uuid, actor)
|
||||
notifyListeners(ActorRegistered(address, actor))
|
||||
notifyListeners(ActorRegistered(address, actor, Option(typedActorsByUuid get actor.uuid)))
|
||||
}
|
||||
|
||||
private[akka] def registerTypedActor(actorRef: ActorRef, interface: AnyRef) {
|
||||
typedActorsByUuid.put(actorRef.uuid, interface)
|
||||
actorRef.start // register actorRef
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -79,7 +80,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag
|
|||
private[akka] def unregister(address: String) {
|
||||
val actor = actorsByAddress remove address
|
||||
actorsByUuid remove actor.uuid
|
||||
notifyListeners(ActorUnregistered(address, actor))
|
||||
notifyListeners(ActorUnregistered(address, actor, None))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -89,8 +90,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag
|
|||
val address = actor.address
|
||||
actorsByAddress remove address
|
||||
actorsByUuid remove actor.uuid
|
||||
typedActorsByUuid remove actor.uuid
|
||||
notifyListeners(ActorUnregistered(address, actor))
|
||||
notifyListeners(ActorUnregistered(address, actor, Option(typedActorsByUuid remove actor.uuid)))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ object TypedActor {
|
|||
}
|
||||
|
||||
case class TypedActorInvocationHandler(actor: ActorRef) extends InvocationHandler {
|
||||
val impl = actor.actor.asInstanceOf[TypedActor[_, _]].me
|
||||
def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = method.getName match {
|
||||
case "toString" ⇒ actor.toString
|
||||
case "equals" ⇒ (args.length == 1 && (proxy eq args(0)) || actor == getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean
|
||||
|
|
@ -157,7 +158,7 @@ object TypedActor {
|
|||
|
||||
val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(ref)).asInstanceOf[T]
|
||||
proxyRef.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
|
||||
Actor.registry.registerTypedActor(ref.start, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop
|
||||
Actor.registry.registerTypedActor(ref, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop
|
||||
proxy
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@
|
|||
package akka.camel
|
||||
|
||||
import java.lang.reflect.Method
|
||||
import java.lang.reflect.Proxy._
|
||||
|
||||
import akka.actor.{ TypedActor, ActorRef }
|
||||
|
||||
|
|
@ -12,6 +13,7 @@ import akka.actor.{ TypedActor, ActorRef }
|
|||
* @author Martin Krasser
|
||||
*/
|
||||
private[camel] object TypedConsumer {
|
||||
|
||||
/**
|
||||
* Applies a function <code>f</code> to <code>actorRef</code> if <code>actorRef</code>
|
||||
* references a typed consumer actor. A valid reference to a typed consumer actor is a
|
||||
|
|
@ -21,18 +23,35 @@ private[camel] object TypedConsumer {
|
|||
* is called with the corresponding <code>method</code> instance and the return value is
|
||||
* added to a list which is then returned by this method.
|
||||
*/
|
||||
def withTypedConsumer[T](actorRef: ActorRef)(f: Method ⇒ T): List[T] = {
|
||||
if (!actorRef.actor.isInstanceOf[TypedActor]) Nil
|
||||
else if (actorRef.homeAddress.isDefined) Nil
|
||||
else {
|
||||
val typedActor = actorRef.actor.asInstanceOf[TypedActor]
|
||||
// TODO: support consumer annotation inheritance
|
||||
// - visit overridden methods in superclasses
|
||||
// - visit implemented method declarations in interfaces
|
||||
val intfClass = typedActor.proxy.getClass
|
||||
val implClass = typedActor.getClass
|
||||
(for (m ← intfClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume]))) yield f(m)) ++
|
||||
(for (m ← implClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume]))) yield f(m))
|
||||
def withTypedConsumer[T](actorRef: ActorRef, typedActor: Option[AnyRef])(f: (AnyRef, Method) ⇒ T): List[T] = {
|
||||
typedActor match {
|
||||
case None ⇒ Nil
|
||||
case Some(tc) ⇒ {
|
||||
withConsumeAnnotatedMethodsOnInterfaces(tc, f) ++
|
||||
withConsumeAnnotatedMethodsonImplClass(tc, f)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private implicit def class2ProxyClass(c: Class[_]) = new ProxyClass(c)
|
||||
|
||||
private def withConsumeAnnotatedMethodsOnInterfaces[T](tc: AnyRef, f: (AnyRef, Method) ⇒ T): List[T] = for {
|
||||
i ← tc.getClass.allInterfaces
|
||||
m ← i.getDeclaredMethods.toList
|
||||
if (m.isAnnotationPresent(classOf[consume]))
|
||||
} yield f(tc, m)
|
||||
|
||||
private def withConsumeAnnotatedMethodsonImplClass[T](tc: AnyRef, f: (AnyRef, Method) ⇒ T): List[T] = {
|
||||
val implClass = getInvocationHandler(tc).asInstanceOf[TypedActor.TypedActorInvocationHandler].impl.asInstanceOf[AnyRef].getClass
|
||||
for (m ← implClass.getDeclaredMethods.toList; if (m.isAnnotationPresent(classOf[consume]))) yield f(tc, m)
|
||||
|
||||
}
|
||||
|
||||
private class ProxyClass(c: Class[_]) {
|
||||
def allInterfaces: List[Class[_]] = allInterfaces(c.getInterfaces.toList)
|
||||
def allInterfaces(is: List[Class[_]]): List[Class[_]] = is match {
|
||||
case Nil ⇒ Nil
|
||||
case x :: xs ⇒ x :: allInterfaces(x.getInterfaces.toList) ::: allInterfaces(xs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,8 +7,8 @@ package akka.camel
|
|||
import java.lang.reflect.Method
|
||||
|
||||
import akka.actor._
|
||||
import akka.event.EventHandler
|
||||
import akka.camel.component.TypedActorComponent
|
||||
import akka.event.EventHandler
|
||||
|
||||
/**
|
||||
* Concrete publish requestor that requests publication of typed consumer actor methods on
|
||||
|
|
@ -19,8 +19,8 @@ import akka.camel.component.TypedActorComponent
|
|||
*/
|
||||
private[camel] class TypedConsumerPublishRequestor extends PublishRequestor {
|
||||
def receiveActorRegistryEvent = {
|
||||
case ActorRegistered(actor) ⇒ for (event ← ConsumerMethodRegistered.eventsFor(actor)) deliverCurrentEvent(event)
|
||||
case ActorUnregistered(actor) ⇒ for (event ← ConsumerMethodUnregistered.eventsFor(actor)) deliverCurrentEvent(event)
|
||||
case ActorRegistered(_, actor, typedActor) ⇒ for (event ← ConsumerMethodRegistered.eventsFor(actor, typedActor)) deliverCurrentEvent(event)
|
||||
case ActorUnregistered(_, actor, typedActor) ⇒ for (event ← ConsumerMethodUnregistered.eventsFor(actor, typedActor)) deliverCurrentEvent(event)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -84,12 +84,12 @@ private[camel] class ConsumerMethodRouteBuilder(event: ConsumerMethodRegistered)
|
|||
*/
|
||||
private[camel] trait ConsumerMethodEvent extends ConsumerEvent {
|
||||
val actorRef: ActorRef
|
||||
val typedActor: AnyRef
|
||||
val method: Method
|
||||
|
||||
val uuid = actorRef.uuid.toString
|
||||
val methodName = method.getName
|
||||
val methodUuid = "%s_%s" format (uuid, methodName)
|
||||
val typedActor = actorRef.actor.asInstanceOf[TypedActor].proxy
|
||||
|
||||
lazy val routeDefinitionHandler = consumeAnnotation.routeDefinitionHandler.newInstance
|
||||
lazy val consumeAnnotation = method.getAnnotation(classOf[consume])
|
||||
|
|
@ -100,13 +100,13 @@ private[camel] trait ConsumerMethodEvent extends ConsumerEvent {
|
|||
* Event indicating that a typed consumer actor has been registered at the actor registry. For
|
||||
* each <code>@consume</code> annotated typed actor method a separate event is created.
|
||||
*/
|
||||
private[camel] case class ConsumerMethodRegistered(actorRef: ActorRef, method: Method) extends ConsumerMethodEvent
|
||||
private[camel] case class ConsumerMethodRegistered(actorRef: ActorRef, typedActor: AnyRef, method: Method) extends ConsumerMethodEvent
|
||||
|
||||
/**
|
||||
* Event indicating that a typed consumer actor has been unregistered from the actor registry. For
|
||||
* each <code>@consume</code> annotated typed actor method a separate event is created.
|
||||
*/
|
||||
private[camel] case class ConsumerMethodUnregistered(actorRef: ActorRef, method: Method) extends ConsumerMethodEvent
|
||||
private[camel] case class ConsumerMethodUnregistered(actorRef: ActorRef, typedActor: AnyRef, method: Method) extends ConsumerMethodEvent
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
|
|
@ -116,9 +116,9 @@ private[camel] object ConsumerMethodRegistered {
|
|||
* Creates a list of ConsumerMethodRegistered event messages for a typed consumer actor or an empty
|
||||
* list if <code>actorRef</code> doesn't reference a typed consumer actor.
|
||||
*/
|
||||
def eventsFor(actorRef: ActorRef): List[ConsumerMethodRegistered] = {
|
||||
TypedConsumer.withTypedConsumer(actorRef: ActorRef) { m ⇒
|
||||
ConsumerMethodRegistered(actorRef, m)
|
||||
def eventsFor(actorRef: ActorRef, typedActor: Option[AnyRef]): List[ConsumerMethodRegistered] = {
|
||||
TypedConsumer.withTypedConsumer(actorRef, typedActor) { (tc, m) ⇒
|
||||
ConsumerMethodRegistered(actorRef, tc, m)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -131,9 +131,9 @@ private[camel] object ConsumerMethodUnregistered {
|
|||
* Creates a list of ConsumerMethodUnregistered event messages for a typed consumer actor or an empty
|
||||
* list if <code>actorRef</code> doesn't reference a typed consumer actor.
|
||||
*/
|
||||
def eventsFor(actorRef: ActorRef): List[ConsumerMethodUnregistered] = {
|
||||
TypedConsumer.withTypedConsumer(actorRef) { m ⇒
|
||||
ConsumerMethodUnregistered(actorRef, m)
|
||||
def eventsFor(actorRef: ActorRef, typedActor: Option[AnyRef]): List[ConsumerMethodUnregistered] = {
|
||||
TypedConsumer.withTypedConsumer(actorRef, typedActor) { (tc, m) ⇒
|
||||
ConsumerMethodUnregistered(actorRef, tc, m)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -65,10 +65,10 @@ class TypedActorHolder(uri: String, context: CamelContext, name: String)
|
|||
extends RegistryBean(context, name) {
|
||||
|
||||
/**
|
||||
* Returns an <code>akka.camel.component.TypedActorInfo</code> instance.
|
||||
* Returns an <code>akka.camel.component.BeanInfo</code> instance.
|
||||
*/
|
||||
override def getBeanInfo: BeanInfo =
|
||||
new TypedActorInfo(getContext, getBean.getClass, getParameterMappingStrategy)
|
||||
new BeanInfo(getContext, getBean.getClass, getParameterMappingStrategy)
|
||||
|
||||
/**
|
||||
* Obtains a typed actor from <code>Actor.registry</code> if the schema is
|
||||
|
|
@ -80,39 +80,6 @@ class TypedActorHolder(uri: String, context: CamelContext, name: String)
|
|||
*/
|
||||
override def getBean: AnyRef = {
|
||||
val internal = uri.startsWith(TypedActorComponent.InternalSchema)
|
||||
if (internal) Actor.registry.typedActorFor(uuidFrom(getName)) getOrElse null else super.getBean
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Typed actor meta information.
|
||||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
class TypedActorInfo(context: CamelContext, clazz: Class[_], strategy: ParameterMappingStrategy)
|
||||
extends BeanInfo(context, clazz, strategy) {
|
||||
|
||||
/**
|
||||
* Introspects AspectWerkz proxy classes.
|
||||
*
|
||||
* @param clazz AspectWerkz proxy class.
|
||||
*/
|
||||
protected override def introspect(clazz: Class[_]): Unit = {
|
||||
|
||||
// TODO: fix target class detection in BeanInfo.introspect(Class)
|
||||
// Camel assumes that classes containing a '$$' in the class name
|
||||
// are classes generated with CGLIB. This conflicts with proxies
|
||||
// created from interfaces with AspectWerkz. Once the fix is in
|
||||
// place this method can be removed.
|
||||
|
||||
for (method ← clazz.getDeclaredMethods) {
|
||||
if (isValidMethod(clazz, method)) {
|
||||
introspect(clazz, method)
|
||||
}
|
||||
}
|
||||
val superclass = clazz.getSuperclass
|
||||
if ((superclass ne null) && !superclass.equals(classOf[AnyRef])) {
|
||||
introspect(superclass)
|
||||
}
|
||||
if (internal) Actor.registry.local.typedActorFor(uuidFrom(getName)) getOrElse null else super.getBean
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,11 +1,9 @@
|
|||
package akka.camel;
|
||||
|
||||
import akka.actor.TypedActor;
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
public class SampleErrorHandlingTypedConsumerImpl extends TypedActor implements SampleErrorHandlingTypedConsumer {
|
||||
public class SampleErrorHandlingTypedConsumerImpl implements SampleErrorHandlingTypedConsumer {
|
||||
|
||||
public String willFail(String s) {
|
||||
throw new RuntimeException(String.format("error: %s", s));
|
||||
|
|
|
|||
|
|
@ -1,11 +1,9 @@
|
|||
package akka.camel;
|
||||
|
||||
import akka.actor.TypedActor;
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
public class SampleRemoteTypedConsumerImpl extends TypedActor implements SampleRemoteTypedConsumer {
|
||||
public class SampleRemoteTypedConsumerImpl implements SampleRemoteTypedConsumer {
|
||||
|
||||
public String foo(String s) {
|
||||
return String.format("remote typed actor: %s", s);
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ import akka.actor.TypedActor;
|
|||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
public class SampleTypedActorImpl extends TypedActor implements SampleTypedActor {
|
||||
public class SampleTypedActorImpl implements SampleTypedActor {
|
||||
|
||||
public String foo(String s) {
|
||||
return String.format("foo: %s", s);
|
||||
|
|
|
|||
|
|
@ -1,11 +1,9 @@
|
|||
package akka.camel;
|
||||
|
||||
import akka.actor.TypedActor;
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
public class SampleTypedConsumerImpl extends TypedActor implements SampleTypedConsumer {
|
||||
public class SampleTypedConsumerImpl implements SampleTypedConsumer {
|
||||
|
||||
public String m1(String b, String h) {
|
||||
return "m1: " + b + " " + h;
|
||||
|
|
|
|||
|
|
@ -1,11 +1,9 @@
|
|||
package akka.camel;
|
||||
|
||||
import akka.actor.TypedActor;
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
public class SampleTypedSingleConsumerImpl extends TypedActor implements SampleTypedSingleConsumer {
|
||||
public class SampleTypedSingleConsumerImpl implements SampleTypedSingleConsumer {
|
||||
|
||||
public void foo(String b) {
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,11 @@
|
|||
package akka.camel;
|
||||
|
||||
import akka.actor.Actor;
|
||||
import akka.actor.TypedActor;
|
||||
import akka.actor.TypedActor.Configuration;
|
||||
import akka.dispatch.Dispatchers;
|
||||
import akka.japi.SideEffect;
|
||||
import akka.util.FiniteDuration;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
|
@ -28,16 +32,18 @@ public class TypedConsumerJavaTestBase {
|
|||
@AfterClass
|
||||
public static void tearDownAfterClass() {
|
||||
stopCamelService();
|
||||
registry().shutdownAll();
|
||||
registry().local().shutdownAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldHandleExceptionThrownByTypedActorAndGenerateCustomResponse() {
|
||||
getMandatoryService().awaitEndpointActivation(1, new SideEffect() {
|
||||
public void apply() {
|
||||
consumer = TypedActor.newInstance(
|
||||
consumer = TypedActor.typedActorOf(
|
||||
SampleErrorHandlingTypedConsumer.class,
|
||||
SampleErrorHandlingTypedConsumerImpl.class);
|
||||
SampleErrorHandlingTypedConsumerImpl.class,
|
||||
new Configuration(new FiniteDuration(5000, "millis"), Dispatchers.defaultGlobalDispatcher()
|
||||
));
|
||||
}
|
||||
});
|
||||
String result = getMandatoryTemplate().requestBody("direct:error-handler-test-java-typed", "hello", String.class);
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import org.scalatest.junit.JUnitSuite
|
|||
|
||||
import akka.actor._
|
||||
import akka.actor.Actor._
|
||||
import akka.actor.TypedActor.Configuration._
|
||||
import akka.camel.TypedCamelTestSupport.{ SetExpectedMessageCount ⇒ SetExpectedTestMessageCount, _ }
|
||||
|
||||
class TypedConsumerPublishRequestorTest extends JUnitSuite {
|
||||
|
|
@ -33,14 +34,14 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite {
|
|||
@After
|
||||
def tearDown = {
|
||||
Actor.registry.removeListener(requestor);
|
||||
Actor.registry.shutdownAll
|
||||
Actor.registry.local.shutdownAll
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldReceiveOneConsumerMethodRegisteredEvent = {
|
||||
Actor.registry.addListener(requestor)
|
||||
val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get
|
||||
val obj = TypedActor.newInstance(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl])
|
||||
val obj = TypedActor.typedActorOf(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl], defaultConfiguration)
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
val event = (publisher !! GetRetainedMessage).as[ConsumerMethodRegistered].get
|
||||
assert(event.endpointUri === "direct:foo")
|
||||
|
|
@ -50,7 +51,7 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite {
|
|||
|
||||
@Test
|
||||
def shouldReceiveOneConsumerMethodUnregisteredEvent = {
|
||||
val obj = TypedActor.newInstance(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl])
|
||||
val obj = TypedActor.typedActorOf(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl], defaultConfiguration)
|
||||
val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get
|
||||
Actor.registry.addListener(requestor)
|
||||
TypedActor.stop(obj)
|
||||
|
|
@ -65,7 +66,7 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite {
|
|||
def shouldReceiveThreeConsumerMethodRegisteredEvents = {
|
||||
Actor.registry.addListener(requestor)
|
||||
val latch = (publisher !! SetExpectedTestMessageCount(3)).as[CountDownLatch].get
|
||||
val obj = TypedActor.newInstance(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl])
|
||||
val obj = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], defaultConfiguration)
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
val request = GetRetainedMessages(_.isInstanceOf[ConsumerMethodRegistered])
|
||||
val events = (publisher !! request).as[List[ConsumerMethodRegistered]].get
|
||||
|
|
@ -74,7 +75,7 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite {
|
|||
|
||||
@Test
|
||||
def shouldReceiveThreeConsumerMethodUnregisteredEvents = {
|
||||
val obj = TypedActor.newInstance(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl])
|
||||
val obj = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], defaultConfiguration)
|
||||
val latch = (publisher !! SetExpectedTestMessageCount(3)).as[CountDownLatch].get
|
||||
Actor.registry.addListener(requestor)
|
||||
TypedActor.stop(obj)
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import org.scalatest.matchers.MustMatchers
|
|||
|
||||
import akka.actor.Actor._
|
||||
import akka.actor._
|
||||
import akka.actor.TypedActor.Configuration._
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
|
|
@ -18,13 +19,13 @@ class TypedConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMa
|
|||
var service: CamelService = _
|
||||
|
||||
override protected def beforeAll = {
|
||||
registry.shutdownAll
|
||||
registry.local.shutdownAll
|
||||
service = CamelServiceManager.startCamelService
|
||||
}
|
||||
|
||||
override protected def afterAll = {
|
||||
service.stop
|
||||
registry.shutdownAll
|
||||
registry.local.shutdownAll
|
||||
}
|
||||
|
||||
"A responding, typed consumer" when {
|
||||
|
|
@ -32,7 +33,7 @@ class TypedConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMa
|
|||
"started" must {
|
||||
"support in-out message exchanges via its endpoints" in {
|
||||
service.awaitEndpointActivation(3) {
|
||||
actor = TypedActor.newInstance(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl])
|
||||
actor = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], defaultConfiguration)
|
||||
} must be(true)
|
||||
mandatoryTemplate.requestBodyAndHeader("direct:m2", "x", "test", "y") must equal("m2: x y")
|
||||
mandatoryTemplate.requestBodyAndHeader("direct:m3", "x", "test", "y") must equal("m3: x y")
|
||||
|
|
@ -62,7 +63,7 @@ class TypedConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMa
|
|||
"started" must {
|
||||
"support in-out message exchanges via its endpoints" in {
|
||||
service.awaitEndpointActivation(2) {
|
||||
actor = TypedActor.newInstance(classOf[TestTypedConsumer], classOf[TestTypedConsumerImpl])
|
||||
actor = TypedActor.typedActorOf(classOf[TestTypedConsumer], classOf[TestTypedConsumerImpl], defaultConfiguration)
|
||||
} must be(true)
|
||||
mandatoryTemplate.requestBody("direct:publish-test-3", "x") must equal("foo: x")
|
||||
mandatoryTemplate.requestBody("direct:publish-test-4", "x") must equal("bar: x")
|
||||
|
|
@ -91,7 +92,7 @@ object TypedConsumerScalaTest {
|
|||
def bar(s: String): String
|
||||
}
|
||||
|
||||
class TestTypedConsumerImpl extends TypedActor with TestTypedConsumer {
|
||||
class TestTypedConsumerImpl extends TestTypedConsumer {
|
||||
def foo(s: String) = "foo: %s" format s
|
||||
@consume("direct:publish-test-4")
|
||||
def bar(s: String) = "bar: %s" format s
|
||||
|
|
|
|||
|
|
@ -6,8 +6,8 @@ import org.apache.camel.impl.{ DefaultCamelContext, SimpleRegistry }
|
|||
import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec }
|
||||
|
||||
import akka.actor.{ Actor, TypedActor }
|
||||
import akka.actor.TypedActor.Configuration._
|
||||
import akka.camel._
|
||||
import akka.util.ReflectiveAccess.TypedActorModule
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
|
|
@ -19,10 +19,14 @@ class TypedActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll
|
|||
var typedConsumerUuid: String = _
|
||||
|
||||
override protected def beforeAll = {
|
||||
val typedActor = TypedActor.newInstance(classOf[SampleTypedActor], classOf[SampleTypedActorImpl]) // not a consumer
|
||||
val typedConsumer = TypedActor.newInstance(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl])
|
||||
val typedActor = TypedActor.typedActorOf(
|
||||
classOf[SampleTypedActor],
|
||||
classOf[SampleTypedActorImpl], defaultConfiguration) // not a consumer
|
||||
val typedConsumer = TypedActor.typedActorOf(
|
||||
classOf[SampleTypedConsumer],
|
||||
classOf[SampleTypedConsumerImpl], defaultConfiguration)
|
||||
|
||||
typedConsumerUuid = TypedActorModule.typedActorObjectInstance.get.actorFor(typedConsumer).get.uuid.toString
|
||||
typedConsumerUuid = TypedActor.getActorRefFor(typedConsumer).uuid.toString
|
||||
|
||||
val registry = new SimpleRegistry
|
||||
// external registration
|
||||
|
|
@ -35,7 +39,7 @@ class TypedActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll
|
|||
|
||||
override protected def afterAll = {
|
||||
CamelContextManager.stop
|
||||
Actor.registry.shutdownAll
|
||||
Actor.registry.local.shutdownAll
|
||||
}
|
||||
|
||||
feature("Communicate with an internally-registered typed actor using typed-actor-internal endpoint URIs") {
|
||||
|
|
|
|||
|
|
@ -20,8 +20,8 @@ import akka.event.EventHandler
|
|||
*/
|
||||
private[camel] class ConsumerPublishRequestor extends PublishRequestor {
|
||||
def receiveActorRegistryEvent = {
|
||||
case ActorRegistered(_, actor) ⇒ for (event ← ConsumerActorRegistered.eventFor(actor)) deliverCurrentEvent(event)
|
||||
case ActorUnregistered(_, actor) ⇒ for (event ← ConsumerActorUnregistered.eventFor(actor)) deliverCurrentEvent(event)
|
||||
case ActorRegistered(_, actor, None) ⇒ for (event ← ConsumerActorRegistered.eventFor(actor)) deliverCurrentEvent(event)
|
||||
case ActorUnregistered(_, actor, None) ⇒ for (event ← ConsumerActorUnregistered.eventFor(actor)) deliverCurrentEvent(event)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ private[camel] abstract class PublishRequestor extends Actor {
|
|||
* @author Martin Krasser
|
||||
*/
|
||||
private[camel] object PublishRequestor {
|
||||
def pastActorRegisteredEvents = for (actor ← Actor.registry.local.actors) yield ActorRegistered(actor.address, actor)
|
||||
def pastActorRegisteredEvents = for (actor ← Actor.registry.local.actors) yield ActorRegistered(actor.address, actor, None)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ class ConsumerPublishRequestorTest extends JUnitSuite {
|
|||
@Test
|
||||
def shouldReceiveOneConsumerRegisteredEvent = {
|
||||
val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get
|
||||
requestor ! ActorRegistered(consumer.address, consumer)
|
||||
requestor ! ActorRegistered(consumer.address, consumer, None)
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
assert((publisher !! GetRetainedMessage) ===
|
||||
Some(ConsumerActorRegistered(consumer, consumer.actor.asInstanceOf[Consumer])))
|
||||
|
|
@ -45,7 +45,7 @@ class ConsumerPublishRequestorTest extends JUnitSuite {
|
|||
@Test
|
||||
def shouldReceiveOneConsumerUnregisteredEvent = {
|
||||
val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get
|
||||
requestor ! ActorUnregistered(consumer.address, consumer)
|
||||
requestor ! ActorUnregistered(consumer.address, consumer, None)
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
assert((publisher !! GetRetainedMessage) ===
|
||||
Some(ConsumerActorUnregistered(consumer, consumer.actor.asInstanceOf[Consumer])))
|
||||
|
|
|
|||
|
|
@ -75,11 +75,13 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
|
|||
lazy val jsr311ModuleConfig = ModuleConfiguration("javax.ws.rs", "jsr311-api", sbt.DefaultMavenRepository)
|
||||
lazy val zookeeperModuleConfig = ModuleConfiguration("org.apache.hadoop.zookeeper", AkkaRepo)
|
||||
lazy val zkclientModuleConfig = ModuleConfiguration("zkclient", AkkaRepo)
|
||||
lazy val camelModuleConfig = ModuleConfiguration("org.apache.camel", "camel-core", AkkaRepo)
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------
|
||||
// Versions
|
||||
// -------------------------------------------------------------------------------------------------------------------
|
||||
lazy val CAMEL_VERSION = "2.7.1"
|
||||
lazy val CAMEL_PATCH_VERSION = "2.7.1.1"
|
||||
lazy val SPRING_VERSION = "3.0.5.RELEASE"
|
||||
lazy val JACKSON_VERSION = "1.8.0"
|
||||
lazy val JERSEY_VERSION = "1.3"
|
||||
|
|
@ -103,7 +105,7 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
|
|||
lazy val aspectwerkz = "org.codehaus.aspectwerkz" % "aspectwerkz" % "2.2.3" % "compile" //ApacheV2
|
||||
lazy val beanstalk = "beanstalk" % "beanstalk_client" % "1.4.5" //New BSD
|
||||
lazy val bookkeeper = "org.apache.hadoop.zookeeper" % "bookkeeper" % ZOOKEEPER_VERSION //ApacheV2
|
||||
lazy val camel_core = "org.apache.camel" % "camel-core" % CAMEL_VERSION % "compile" //ApacheV2
|
||||
lazy val camel_core = "org.apache.camel" % "camel-core" % CAMEL_PATCH_VERSION % "compile" //ApacheV2
|
||||
|
||||
lazy val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" //ApacheV2
|
||||
lazy val commons_io = "commons-io" % "commons-io" % "2.0.1" % "compile" //ApacheV2
|
||||
|
|
@ -170,7 +172,7 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
|
|||
lazy val akka_durable_mailboxes = project("akka-durable-mailboxes", "akka-durable-mailboxes", new AkkaDurableMailboxesParentProject(_), akka_remote)
|
||||
|
||||
lazy val akka_camel = project("akka-camel", "akka-camel", new AkkaCamelProject(_), akka_actor, akka_slf4j)
|
||||
//lazy val akka_camel_typed = project("akka-camel-typed", "akka-camel-typed", new AkkaCamelTypedProject(_), akka_actor, akka_slf4j, akka_camel)
|
||||
lazy val akka_camel_typed = project("akka-camel-typed", "akka-camel-typed", new AkkaCamelTypedProject(_), akka_actor, akka_slf4j, akka_camel)
|
||||
//lazy val akka_spring = project("akka-spring", "akka-spring", new AkkaSpringProject(_), akka_remote, akka_actor, akka_camel)
|
||||
lazy val akka_kernel = project("akka-kernel", "akka-kernel", new AkkaKernelProject(_), akka_stm, akka_remote, akka_http, akka_slf4j, akka_camel)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue