Improve API to wait for endpoint activation/deactivation. Closes #472
This commit is contained in:
parent
769786d544
commit
96b8b455ed
5 changed files with 77 additions and 68 deletions
|
|
@ -4,6 +4,7 @@
|
|||
package se.scalablesolutions.akka.camel
|
||||
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import org.apache.camel.CamelContext
|
||||
|
||||
|
|
@ -94,12 +95,34 @@ trait CamelService extends Bootable with Logging {
|
|||
CamelContextManager.stop
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for an expected number (<code>count</code>) of endpoints to be activated
|
||||
* during execution of <code>f</code>. The wait-timeout is by default 10 seconds.
|
||||
* Other timeout values can be set via the <code>timeout</code> and <code>timeUnit</code>
|
||||
* parameters.
|
||||
*/
|
||||
def awaitEndpointActivation(count: Int, timeout: Long = 10, timeUnit: TimeUnit = TimeUnit.SECONDS)(f: => Unit) = {
|
||||
val activation = expectEndpointActivationCount(count)
|
||||
f; activation.await(timeout, timeUnit)
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for an expected number (<code>count</code>) of endpoints to be de-activated
|
||||
* during execution of <code>f</code>. The wait-timeout is by default 10 seconds.
|
||||
* Other timeout values can be set via the <code>timeout</code> and <code>timeUnit</code>
|
||||
* parameters.
|
||||
*/
|
||||
def awaitEndpointDeactivation(count: Int, timeout: Long = 10, timeUnit: TimeUnit = TimeUnit.SECONDS)(f: => Unit) = {
|
||||
val activation = expectEndpointDeactivationCount(count)
|
||||
f; activation.await(timeout, timeUnit)
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets an expectation on the number of upcoming endpoint activations and returns
|
||||
* a CountDownLatch that can be used to wait for the activations to occur. Endpoint
|
||||
* a CountDownLatch that can be used to wait for the activations to occur. Endpoint
|
||||
* activations that occurred in the past are not considered.
|
||||
*/
|
||||
def expectEndpointActivationCount(count: Int): CountDownLatch =
|
||||
private def expectEndpointActivationCount(count: Int): CountDownLatch =
|
||||
(consumerPublisher !! SetExpectedRegistrationCount(count)).as[CountDownLatch].get
|
||||
|
||||
/**
|
||||
|
|
@ -107,7 +130,7 @@ trait CamelService extends Bootable with Logging {
|
|||
* a CountDownLatch that can be used to wait for the de-activations to occur. Endpoint
|
||||
* de-activations that occurred in the past are not considered.
|
||||
*/
|
||||
def expectEndpointDeactivationCount(count: Int): CountDownLatch =
|
||||
private def expectEndpointDeactivationCount(count: Int): CountDownLatch =
|
||||
(consumerPublisher !! SetExpectedUnregistrationCount(count)).as[CountDownLatch].get
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -28,12 +28,9 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
|||
// start consumer publisher, otherwise we cannot set message
|
||||
// count expectations in the next step (needed for testing only).
|
||||
service.consumerPublisher.start
|
||||
// set expectations on publish count
|
||||
val latch = service.expectEndpointActivationCount(1)
|
||||
// start the CamelService
|
||||
service.start
|
||||
// await publication of first test consumer
|
||||
latch.await(5000, TimeUnit.MILLISECONDS) must be (true)
|
||||
service.awaitEndpointActivation(1) {
|
||||
service.start
|
||||
} must be (true)
|
||||
}
|
||||
|
||||
override protected def afterAll = {
|
||||
|
|
@ -55,9 +52,9 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
|||
}
|
||||
"started" must {
|
||||
"support an in-out message exchange via its endpoint" in {
|
||||
val latch = service.expectEndpointActivationCount(1)
|
||||
consumer.start
|
||||
latch.await(5000, TimeUnit.MILLISECONDS) must be (true)
|
||||
service.awaitEndpointActivation(1) {
|
||||
consumer.start
|
||||
} must be (true)
|
||||
mandatoryTemplate.requestBody("direct:publish-test-2", "msg2") must equal ("received msg2")
|
||||
}
|
||||
"have an associated endpoint in the CamelContext" in {
|
||||
|
|
@ -66,9 +63,9 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
|||
}
|
||||
"stopped" must {
|
||||
"not support an in-out message exchange via its endpoint" in {
|
||||
val latch = service.expectEndpointDeactivationCount(1)
|
||||
consumer.stop
|
||||
latch.await(5000, TimeUnit.MILLISECONDS) must be (true)
|
||||
service.awaitEndpointDeactivation(1) {
|
||||
consumer.stop
|
||||
} must be (true)
|
||||
intercept[CamelExecutionException] {
|
||||
mandatoryTemplate.requestBody("direct:publish-test-2", "msg2")
|
||||
}
|
||||
|
|
@ -80,9 +77,9 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
|||
var actor: SampleTypedConsumer = null
|
||||
"started" must {
|
||||
"support in-out message exchanges via its endpoints" in {
|
||||
val latch = service.expectEndpointActivationCount(3)
|
||||
actor = TypedActor.newInstance(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl])
|
||||
latch.await(5000, TimeUnit.MILLISECONDS) must be (true)
|
||||
service.awaitEndpointActivation(3) {
|
||||
actor = TypedActor.newInstance(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl])
|
||||
} must be (true)
|
||||
mandatoryTemplate.requestBodyAndHeader("direct:m2", "x", "test", "y") must equal ("m2: x y")
|
||||
mandatoryTemplate.requestBodyAndHeader("direct:m3", "x", "test", "y") must equal ("m3: x y")
|
||||
mandatoryTemplate.requestBodyAndHeader("direct:m4", "x", "test", "y") must equal ("m4: x y")
|
||||
|
|
@ -90,9 +87,9 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
|||
}
|
||||
"stopped" must {
|
||||
"not support in-out message exchanges via its endpoints" in {
|
||||
val latch = service.expectEndpointDeactivationCount(3)
|
||||
TypedActor.stop(actor)
|
||||
latch.await(5000, TimeUnit.MILLISECONDS) must be (true)
|
||||
service.awaitEndpointDeactivation(3) {
|
||||
TypedActor.stop(actor)
|
||||
} must be (true)
|
||||
intercept[CamelExecutionException] {
|
||||
mandatoryTemplate.requestBodyAndHeader("direct:m2", "x", "test", "y")
|
||||
}
|
||||
|
|
@ -110,18 +107,18 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
|||
var actor: TestTypedConsumer = null
|
||||
"started" must {
|
||||
"support in-out message exchanges via its endpoints" in {
|
||||
val latch = service.expectEndpointActivationCount(2)
|
||||
actor = TypedActor.newInstance(classOf[TestTypedConsumer], classOf[TestTypedConsumerImpl])
|
||||
latch.await(5000, TimeUnit.MILLISECONDS) must be (true)
|
||||
service.awaitEndpointActivation(2) {
|
||||
actor = TypedActor.newInstance(classOf[TestTypedConsumer], classOf[TestTypedConsumerImpl])
|
||||
} must be (true)
|
||||
mandatoryTemplate.requestBody("direct:publish-test-3", "x") must equal ("foo: x")
|
||||
mandatoryTemplate.requestBody("direct:publish-test-4", "x") must equal ("bar: x")
|
||||
}
|
||||
}
|
||||
"stopped" must {
|
||||
"not support in-out message exchanges via its endpoints" in {
|
||||
val latch = service.expectEndpointDeactivationCount(2)
|
||||
TypedActor.stop(actor)
|
||||
latch.await(5000, TimeUnit.MILLISECONDS) must be (true)
|
||||
service.awaitEndpointDeactivation(2) {
|
||||
TypedActor.stop(actor)
|
||||
} must be (true)
|
||||
intercept[CamelExecutionException] {
|
||||
mandatoryTemplate.requestBody("direct:publish-test-3", "x")
|
||||
}
|
||||
|
|
@ -136,17 +133,17 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
|||
val consumer = UntypedActor.actorOf(classOf[SampleUntypedConsumer])
|
||||
"started" must {
|
||||
"support an in-out message exchange via its endpoint" in {
|
||||
val latch = service.expectEndpointActivationCount(1)
|
||||
consumer.start
|
||||
latch.await(5000, TimeUnit.MILLISECONDS) must be (true)
|
||||
service.awaitEndpointActivation(1) {
|
||||
consumer.start
|
||||
} must be (true)
|
||||
mandatoryTemplate.requestBodyAndHeader("direct:test-untyped-consumer", "x", "test", "y") must equal ("x y")
|
||||
}
|
||||
}
|
||||
"stopped" must {
|
||||
"not support an in-out message exchange via its endpoint" in {
|
||||
val latch = service.expectEndpointDeactivationCount(1)
|
||||
consumer.stop
|
||||
latch.await(5000, TimeUnit.MILLISECONDS) must be (true)
|
||||
service.awaitEndpointDeactivation(1) {
|
||||
consumer.stop
|
||||
} must be (true)
|
||||
intercept[CamelExecutionException] {
|
||||
mandatoryTemplate.sendBodyAndHeader("direct:test-untyped-consumer", "blah", "test", "blub")
|
||||
}
|
||||
|
|
@ -157,9 +154,9 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
|||
"A non-responding, blocking consumer" when {
|
||||
"receiving an in-out message exchange" must {
|
||||
"lead to a TimeoutException" in {
|
||||
val latch = service.expectEndpointActivationCount(1)
|
||||
actorOf(new TestBlocker("direct:publish-test-5")).start
|
||||
latch.await(5000, TimeUnit.MILLISECONDS) must be (true)
|
||||
service.awaitEndpointActivation(1) {
|
||||
actorOf(new TestBlocker("direct:publish-test-5")).start
|
||||
} must be (true)
|
||||
|
||||
try {
|
||||
mandatoryTemplate.requestBody("direct:publish-test-5", "msg3")
|
||||
|
|
|
|||
|
|
@ -45,9 +45,9 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh
|
|||
val consumer = actorOf[RemoteConsumer].start
|
||||
|
||||
when("remote consumer publication is triggered")
|
||||
var latch = mandatoryService.expectEndpointActivationCount(1)
|
||||
consumer !! "init"
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
assert(mandatoryService.awaitEndpointActivation(1) {
|
||||
consumer !! "init"
|
||||
})
|
||||
|
||||
then("the published consumer is accessible via its endpoint URI")
|
||||
val response = CamelContextManager.mandatoryTemplate.requestBody("direct:remote-consumer", "test")
|
||||
|
|
@ -61,10 +61,9 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh
|
|||
val consumer = TypedActor.newRemoteInstance(classOf[SampleRemoteTypedConsumer], classOf[SampleRemoteTypedConsumerImpl], host, port)
|
||||
|
||||
when("remote typed consumer publication is triggered")
|
||||
var latch = mandatoryService.expectEndpointActivationCount(1)
|
||||
consumer.foo("init")
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
|
||||
assert(mandatoryService.awaitEndpointActivation(1) {
|
||||
consumer.foo("init")
|
||||
})
|
||||
then("the published method is accessible via its endpoint URI")
|
||||
val response = CamelContextManager.mandatoryTemplate.requestBody("direct:remote-typed-consumer", "test")
|
||||
assert(response === "remote typed actor: test")
|
||||
|
|
@ -77,10 +76,9 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh
|
|||
val consumer = UntypedActor.actorOf(classOf[SampleRemoteUntypedConsumer]).start
|
||||
|
||||
when("remote untyped consumer publication is triggered")
|
||||
var latch = mandatoryService.expectEndpointActivationCount(1)
|
||||
consumer.sendRequestReply(Message("init", Map("test" -> "init")))
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
|
||||
assert(mandatoryService.awaitEndpointActivation(1) {
|
||||
consumer.sendRequestReply(Message("init", Map("test" -> "init")))
|
||||
})
|
||||
then("the published untyped consumer is accessible via its endpoint URI")
|
||||
val response = CamelContextManager.mandatoryTemplate.requestBodyAndHeader("direct:remote-untyped-consumer", "a", "test", "b")
|
||||
assert(response === "a b")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue