Tests for stopping active object endpoints; minor refactoring in ConsumerPublisher

This commit is contained in:
Martin Krasser 2010-07-05 12:48:26 +02:00
parent ae6bf7aaa0
commit 460dcfe516
3 changed files with 73 additions and 13 deletions

View file

@ -303,6 +303,26 @@ private[camel] object ConsumerUnregistered {
}
}
/**
* @author Martin Krasser
*/
private[camel] object ConsumerMethod {
/**
* Applies a function <code>f</code> to each consumer method of <code>activeObject</code> and
* returns the function results as a list. A consumer method is one that is annotated with
* <code>@consume</code>. If <code>activeObject</code> is a proxy for a remote active object
* <code>f</code> is never called and <code>Nil</code> is returned.
*/
def forConsumer[T](activeObject: AnyRef, init: AspectInit)(f: Method => T): List[T] = {
// TODO: support consumer annotation inheritance
// - visit overridden methods in superclasses
// - visit implemented method declarations in interfaces
if (init.remoteAddress.isDefined) Nil // let remote node publish active object methods on endpoints
else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume])))
yield f(m)
}
}
/**
* @author Martin Krasser
*/
@ -313,12 +333,9 @@ private[camel] object ConsumerMethodRegistered {
* have any <code>@consume</code> annotated methods.
*/
def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodRegistered] = {
// TODO: support consumer annotation inheritance
// - visit overridden methods in superclasses
// - visit implemented method declarations in interfaces
if (init.remoteAddress.isDefined) Nil // let remote node publish active object methods on endpoints
else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume])))
yield ConsumerMethodRegistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
ConsumerMethod.forConsumer[ConsumerMethodRegistered](activeObject, init) {
m => ConsumerMethodRegistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
}
}
}
@ -329,9 +346,9 @@ private[camel] object ConsumerMethodUnregistered {
* have any <code>@consume</code> annotated methods.
*/
def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodUnregistered] = {
if (init.remoteAddress.isDefined) Nil
else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume])))
yield ConsumerMethodUnregistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
ConsumerMethod.forConsumer[ConsumerMethodUnregistered](activeObject, init) {
m => ConsumerMethodUnregistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
}
}
}

View file

@ -59,7 +59,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
feature("Unpublish registered consumer actor from the global CamelContext") {
scenario("attempt access to unregistered consumer actor via Camel direct-endpoint") {
scenario("access to unregistered consumer actor via Camel direct-endpoint fails") {
val endpointUri = "direct:unpublish-test-1"
given("a consumer actor that has been stopped")
@ -78,7 +78,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
when("a request is sent to this actor")
val response1 = CamelContextManager.template.requestBody(endpointUri, "msg1")
then("the direct endpoint falls back to its default behaviour and returns the original message")
then("the direct-endpoint falls back to its default behaviour and returns the original message")
assert(response1 === "msg1")
}
}
@ -103,8 +103,8 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
scenario("access active object methods via Camel direct-endpoints") {
given("an active object registered after CamelService startup")
val latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
ActiveObject.newInstance(classOf[PojoBase])
var latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
val obj = ActiveObject.newInstance(classOf[PojoBase])
assert(latch.await(5000, TimeUnit.MILLISECONDS))
when("requests are sent to published methods")
@ -116,6 +116,36 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
assert(response1 === "m2base: x y")
assert(response2 === "m3base: x y")
assert(response3 === "m4base: x y")
// cleanup to avoid conflicts with next test (i.e. avoid multiple consumers on direct-endpoints)
latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
ActiveObject.stop(obj)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
}
}
feature("Unpublish active object method from the global CamelContext") {
scenario("access to unregistered active object methof via Camel direct-endpoint fails") {
given("an active object that has been stopped")
var latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
val obj = ActiveObject.newInstance(classOf[PojoBase])
assert(latch.await(5000, TimeUnit.MILLISECONDS))
latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
ActiveObject.stop(obj)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
when("requests are sent to published methods")
val response1 = CamelContextManager.template.requestBodyAndHeader("direct:m2base", "x", "test", "y")
val response2 = CamelContextManager.template.requestBodyAndHeader("direct:m3base", "x", "test", "y")
val response3 = CamelContextManager.template.requestBodyAndHeader("direct:m4base", "x", "test", "y")
then("the direct-endpoints fall back to their default behaviour and return the original message")
assert(response1 === "x")
assert(response2 === "x")
assert(response3 === "x")
}
}
}

View file

@ -44,6 +44,19 @@ class PublishRequestorTest extends JUnitSuite {
assert(event.method.getName === "foo")
}
@Test def shouldReceiveConsumerMethodUnregisteredEvent = {
val obj = ActiveObject.newInstance(classOf[PojoSingle])
val init = AspectInit(classOf[PojoSingle], null, None, 1000)
val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get
requestor ! AspectInitUnregistered(obj, init)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val event = (publisher !! GetRetainedMessage).get.asInstanceOf[ConsumerMethodUnregistered]
assert(event.init === init)
assert(event.uri === "direct:foo")
assert(event.activeObject === obj)
assert(event.method.getName === "foo")
}
@Test def shouldReceiveConsumerRegisteredEvent = {
val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get
requestor ! ActorRegistered(consumer)