From ecf970dd4dcf460c5d3753f890761cd3f523b15d Mon Sep 17 00:00:00 2001 From: RayRoestenburg Date: Wed, 3 Oct 2012 14:21:06 +0200 Subject: [PATCH] CAMEL: Used endpoint replyTimeout in processExchangeAdapter, fixed ActorProducerTest which did not wait for completion --- .../internal/component/ActorComponent.scala | 2 +- .../internal/component/ActorProducerTest.scala | 16 +++++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala index 2a1be08354..e9d708ec2a 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala @@ -135,7 +135,7 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex private[camel] def processExchangeAdapter(exchange: CamelExchangeAdapter): Unit = { val isDone = new CountDownLatch(1) processExchangeAdapter(exchange, new AsyncCallback { def done(doneSync: Boolean) { isDone.countDown() } }) - isDone.await(camel.settings.ReplyTimeout.toMillis, TimeUnit.MILLISECONDS) + isDone.await(endpoint.replyTimeout.toMillis, TimeUnit.MILLISECONDS) } /** diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala index 8f7845d3ff..b059903b75 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala @@ -24,11 +24,12 @@ import akka.actor.Status.{ Success, Failure } import com.typesafe.config.ConfigFactory import akka.actor.ActorSystem.Settings import akka.event.LoggingAdapter -import akka.testkit.{ TimingTest, TestKit, TestProbe } +import akka.testkit.{ TestLatch, TimingTest, TestKit, TestProbe } import org.apache.camel.impl.DefaultCamelContext import concurrent.{ Await, Promise, Future } import akka.util.Timeout import akka.actor._ +import akka.testkit._ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with ActorProducerFixture { implicit val timeout = Timeout(10 seconds) @@ -111,10 +112,18 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with } "response is not sent by actor" must { - + val latch = TestLatch(1) + val callback = new AsyncCallback { + def done(doneSync: Boolean) { + latch.countDown() + } + } def process() = { producer = given(outCapable = true, replyTimeout = 100 millis) - time(producer.processExchangeAdapter(exchange)) + val duration = time(producer.processExchangeAdapter(exchange, callback)) + Await.ready(latch, 1.seconds.dilated) + latch.reset + duration } "timeout after replyTimeout" taggedAs TimingTest in { @@ -129,6 +138,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with "set failure message to timeout" in { process() + verify(exchange).setFailure(any[FailureResult]) } }