diff --git a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala index 2a1be08354..0e45a4eaec 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala @@ -135,7 +135,7 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex private[camel] def processExchangeAdapter(exchange: CamelExchangeAdapter): Unit = { val isDone = new CountDownLatch(1) processExchangeAdapter(exchange, new AsyncCallback { def done(doneSync: Boolean) { isDone.countDown() } }) - isDone.await(camel.settings.ReplyTimeout.toMillis, TimeUnit.MILLISECONDS) + isDone.await(endpoint.replyTimeout.length, endpoint.replyTimeout.unit) } /** diff --git a/akka-camel/src/test/scala/akka/camel/TestSupport.scala b/akka-camel/src/test/scala/akka/camel/TestSupport.scala index c25ccdab3c..370e5f99f9 100644 --- a/akka-camel/src/test/scala/akka/camel/TestSupport.scala +++ b/akka-camel/src/test/scala/akka/camel/TestSupport.scala @@ -75,10 +75,10 @@ private[camel] object TestSupport { } def time[A](block: ⇒ A): FiniteDuration = { - val start = System.currentTimeMillis() + val start = System.nanoTime() block - val duration = System.currentTimeMillis() - start - duration millis + val duration = System.nanoTime() - start + duration nanos } def anInstanceOf[T](implicit tag: ClassTag[T]) = { diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala index 8f7845d3ff..d6088fef77 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala @@ -24,11 +24,12 @@ import akka.actor.Status.{ Success, Failure } import com.typesafe.config.ConfigFactory import akka.actor.ActorSystem.Settings import akka.event.LoggingAdapter -import akka.testkit.{ TimingTest, TestKit, TestProbe } +import akka.testkit.{ TestLatch, TimingTest, TestKit, TestProbe } import org.apache.camel.impl.DefaultCamelContext import concurrent.{ Await, Promise, Future } import akka.util.Timeout import akka.actor._ +import akka.testkit._ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with ActorProducerFixture { implicit val timeout = Timeout(10 seconds) @@ -65,9 +66,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with "process the exchange" in { producer = given(outCapable = false, autoAck = false) import system.dispatcher - val future = Future { - producer.processExchangeAdapter(exchange) - } + val future = Future { producer.processExchangeAdapter(exchange) } within(1 second) { probe.expectMsgType[CamelMessage] info("message sent to consumer") @@ -111,10 +110,21 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with } "response is not sent by actor" must { - + val latch = TestLatch(1) + val callback = new AsyncCallback { + def done(doneSync: Boolean) { + latch.countDown() + } + } def process() = { producer = given(outCapable = true, replyTimeout = 100 millis) - time(producer.processExchangeAdapter(exchange)) + val duration = time { + producer.processExchangeAdapter(exchange, callback) + // wait for the actor to complete the callback + Await.ready(latch, 1.seconds.dilated) + } + latch.reset() + duration } "timeout after replyTimeout" taggedAs TimingTest in { @@ -159,16 +169,20 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with val doneSync = producer.processExchangeAdapter(exchange, asyncCallback) - asyncCallback.expectNoCallWithin(100 millis); info("no async callback before response") + asyncCallback.expectNoCallWithin(100 millis) + info("no async callback before response") within(1 second) { probe.expectMsgType[CamelMessage] probe.sender ! "some message" } - doneSync must be(false); info("done async") + doneSync must be(false) + info("done async") - asyncCallback.expectDoneAsyncWithin(1 second); info("async callback received") - verify(exchange).setResponse(msg("some message")); info("response as expected") + asyncCallback.expectDoneAsyncWithin(1 second) + info("async callback received") + verify(exchange).setResponse(msg("some message")) + info("response as expected") } } @@ -197,7 +211,10 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with producer.processExchangeAdapter(exchange, asyncCallback) asyncCallback.awaitCalled(100 millis) verify(exchange).setFailure(Matchers.argThat(new ArgumentMatcher[FailureResult] { - def matches(failure: AnyRef) = { failure.asInstanceOf[FailureResult].cause must be(anInstanceOf[TimeoutException]); true } + def matches(failure: AnyRef) = { + failure.asInstanceOf[FailureResult].cause must be(anInstanceOf[TimeoutException]) + true + } })) } @@ -221,9 +238,12 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with producer = given(outCapable = false, autoAck = true) val doneSync = producer.processExchangeAdapter(exchange, asyncCallback) - doneSync must be(true); info("done sync") - asyncCallback.expectDoneSyncWithin(1 second); info("async callback called") - verify(exchange, never()).setResponse(any[CamelMessage]); info("no response forwarded to exchange") + doneSync must be(true) + info("done sync") + asyncCallback.expectDoneSyncWithin(1 second) + info("async callback called") + verify(exchange, never()).setResponse(any[CamelMessage]) + info("no response forwarded to exchange") } } @@ -238,11 +258,14 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with doneSync must be(false) within(1 second) { - probe.expectMsgType[CamelMessage]; info("message sent to consumer") + probe.expectMsgType[CamelMessage] + info("message sent to consumer") probe.sender ! Ack - asyncCallback.expectDoneAsyncWithin(remaining); info("async callback called") + asyncCallback.expectDoneAsyncWithin(remaining) + info("async callback called") } - verify(exchange, never()).setResponse(any[CamelMessage]); info("no response forwarded to exchange") + verify(exchange, never()).setResponse(any[CamelMessage]) + info("no response forwarded to exchange") } } @@ -253,12 +276,16 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with producer.processExchangeAdapter(exchange, asyncCallback) within(1 second) { - probe.expectMsgType[CamelMessage]; info("message sent to consumer") + probe.expectMsgType[CamelMessage] + info("message sent to consumer") probe.sender ! "some neither Ack nor Failure response" - asyncCallback.expectDoneAsyncWithin(remaining); info("async callback called") + asyncCallback.expectDoneAsyncWithin(remaining) + info("async callback called") } - verify(exchange, never()).setResponse(any[CamelMessage]); info("no response forwarded to exchange") - verify(exchange).setFailure(any[FailureResult]); info("failure set") + verify(exchange, never()).setResponse(any[CamelMessage]) + info("no response forwarded to exchange") + verify(exchange).setFailure(any[FailureResult]) + info("failure set") } } @@ -282,12 +309,15 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with doneSync must be(false) within(1 second) { - probe.expectMsgType[CamelMessage]; info("message sent to consumer") + probe.expectMsgType[CamelMessage] + info("message sent to consumer") probe.sender ! Failure(new Exception) - asyncCallback.awaitCalled(remaining); + asyncCallback.awaitCalled(remaining) } - verify(exchange, never()).setResponse(any[CamelMessage]); info("no response forwarded to exchange") - verify(exchange).setFailure(any[FailureResult]); info("failure set") + verify(exchange, never()).setResponse(any[CamelMessage]) + info("no response forwarded to exchange") + verify(exchange).setFailure(any[FailureResult]) + info("failure set") } } } @@ -363,10 +393,8 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo def createAsyncCallback = new TestAsyncCallback class TestAsyncCallback extends AsyncCallback { - def expectNoCallWithin(duration: Duration) { - if (callbackReceived.await(duration.toNanos, TimeUnit.NANOSECONDS)) fail("NOT expected callback, but received one!") - } - + def expectNoCallWithin(duration: Duration): Unit = + if (callbackReceived.await(duration.length, duration.unit)) fail("NOT expected callback, but received one!") def awaitCalled(timeout: Duration = 1 second) { valueWithin(1 second) } val callbackReceived = new CountDownLatch(1) @@ -378,7 +406,7 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo } private[this] def valueWithin(implicit timeout: FiniteDuration) = - if (!callbackReceived.await(timeout.toNanos, TimeUnit.NANOSECONDS)) fail("Callback not received!") + if (!callbackReceived.await(timeout.length, timeout.unit)) fail("Callback not received!") else callbackValue.get def expectDoneSyncWithin(implicit timeout: FiniteDuration): Unit = if (!valueWithin(timeout)) fail("Expected to be done Synchronously")