fixed timing test when response is not sent by actor
This commit is contained in:
parent
39b4b52e6b
commit
8ef2a47069
3 changed files with 62 additions and 34 deletions
|
|
@ -135,7 +135,7 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
|
||||||
private[camel] def processExchangeAdapter(exchange: CamelExchangeAdapter): Unit = {
|
private[camel] def processExchangeAdapter(exchange: CamelExchangeAdapter): Unit = {
|
||||||
val isDone = new CountDownLatch(1)
|
val isDone = new CountDownLatch(1)
|
||||||
processExchangeAdapter(exchange, new AsyncCallback { def done(doneSync: Boolean) { isDone.countDown() } })
|
processExchangeAdapter(exchange, new AsyncCallback { def done(doneSync: Boolean) { isDone.countDown() } })
|
||||||
isDone.await(camel.settings.ReplyTimeout.toMillis, TimeUnit.MILLISECONDS)
|
isDone.await(endpoint.replyTimeout.length, endpoint.replyTimeout.unit)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -75,10 +75,10 @@ private[camel] object TestSupport {
|
||||||
|
|
||||||
}
|
}
|
||||||
def time[A](block: ⇒ A): FiniteDuration = {
|
def time[A](block: ⇒ A): FiniteDuration = {
|
||||||
val start = System.currentTimeMillis()
|
val start = System.nanoTime()
|
||||||
block
|
block
|
||||||
val duration = System.currentTimeMillis() - start
|
val duration = System.nanoTime() - start
|
||||||
duration millis
|
duration nanos
|
||||||
}
|
}
|
||||||
|
|
||||||
def anInstanceOf[T](implicit tag: ClassTag[T]) = {
|
def anInstanceOf[T](implicit tag: ClassTag[T]) = {
|
||||||
|
|
|
||||||
|
|
@ -24,11 +24,12 @@ import akka.actor.Status.{ Success, Failure }
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import akka.actor.ActorSystem.Settings
|
import akka.actor.ActorSystem.Settings
|
||||||
import akka.event.LoggingAdapter
|
import akka.event.LoggingAdapter
|
||||||
import akka.testkit.{ TimingTest, TestKit, TestProbe }
|
import akka.testkit.{ TestLatch, TimingTest, TestKit, TestProbe }
|
||||||
import org.apache.camel.impl.DefaultCamelContext
|
import org.apache.camel.impl.DefaultCamelContext
|
||||||
import concurrent.{ Await, Promise, Future }
|
import concurrent.{ Await, Promise, Future }
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
|
import akka.testkit._
|
||||||
|
|
||||||
class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with ActorProducerFixture {
|
class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with ActorProducerFixture {
|
||||||
implicit val timeout = Timeout(10 seconds)
|
implicit val timeout = Timeout(10 seconds)
|
||||||
|
|
@ -65,9 +66,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
|
||||||
"process the exchange" in {
|
"process the exchange" in {
|
||||||
producer = given(outCapable = false, autoAck = false)
|
producer = given(outCapable = false, autoAck = false)
|
||||||
import system.dispatcher
|
import system.dispatcher
|
||||||
val future = Future {
|
val future = Future { producer.processExchangeAdapter(exchange) }
|
||||||
producer.processExchangeAdapter(exchange)
|
|
||||||
}
|
|
||||||
within(1 second) {
|
within(1 second) {
|
||||||
probe.expectMsgType[CamelMessage]
|
probe.expectMsgType[CamelMessage]
|
||||||
info("message sent to consumer")
|
info("message sent to consumer")
|
||||||
|
|
@ -111,10 +110,21 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
|
||||||
}
|
}
|
||||||
|
|
||||||
"response is not sent by actor" must {
|
"response is not sent by actor" must {
|
||||||
|
val latch = TestLatch(1)
|
||||||
|
val callback = new AsyncCallback {
|
||||||
|
def done(doneSync: Boolean) {
|
||||||
|
latch.countDown()
|
||||||
|
}
|
||||||
|
}
|
||||||
def process() = {
|
def process() = {
|
||||||
producer = given(outCapable = true, replyTimeout = 100 millis)
|
producer = given(outCapable = true, replyTimeout = 100 millis)
|
||||||
time(producer.processExchangeAdapter(exchange))
|
val duration = time {
|
||||||
|
producer.processExchangeAdapter(exchange, callback)
|
||||||
|
// wait for the actor to complete the callback
|
||||||
|
Await.ready(latch, 1.seconds.dilated)
|
||||||
|
}
|
||||||
|
latch.reset()
|
||||||
|
duration
|
||||||
}
|
}
|
||||||
|
|
||||||
"timeout after replyTimeout" taggedAs TimingTest in {
|
"timeout after replyTimeout" taggedAs TimingTest in {
|
||||||
|
|
@ -159,16 +169,20 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
|
||||||
|
|
||||||
val doneSync = producer.processExchangeAdapter(exchange, asyncCallback)
|
val doneSync = producer.processExchangeAdapter(exchange, asyncCallback)
|
||||||
|
|
||||||
asyncCallback.expectNoCallWithin(100 millis); info("no async callback before response")
|
asyncCallback.expectNoCallWithin(100 millis)
|
||||||
|
info("no async callback before response")
|
||||||
|
|
||||||
within(1 second) {
|
within(1 second) {
|
||||||
probe.expectMsgType[CamelMessage]
|
probe.expectMsgType[CamelMessage]
|
||||||
probe.sender ! "some message"
|
probe.sender ! "some message"
|
||||||
}
|
}
|
||||||
doneSync must be(false); info("done async")
|
doneSync must be(false)
|
||||||
|
info("done async")
|
||||||
|
|
||||||
asyncCallback.expectDoneAsyncWithin(1 second); info("async callback received")
|
asyncCallback.expectDoneAsyncWithin(1 second)
|
||||||
verify(exchange).setResponse(msg("some message")); info("response as expected")
|
info("async callback received")
|
||||||
|
verify(exchange).setResponse(msg("some message"))
|
||||||
|
info("response as expected")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -197,7 +211,10 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
|
||||||
producer.processExchangeAdapter(exchange, asyncCallback)
|
producer.processExchangeAdapter(exchange, asyncCallback)
|
||||||
asyncCallback.awaitCalled(100 millis)
|
asyncCallback.awaitCalled(100 millis)
|
||||||
verify(exchange).setFailure(Matchers.argThat(new ArgumentMatcher[FailureResult] {
|
verify(exchange).setFailure(Matchers.argThat(new ArgumentMatcher[FailureResult] {
|
||||||
def matches(failure: AnyRef) = { failure.asInstanceOf[FailureResult].cause must be(anInstanceOf[TimeoutException]); true }
|
def matches(failure: AnyRef) = {
|
||||||
|
failure.asInstanceOf[FailureResult].cause must be(anInstanceOf[TimeoutException])
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
@ -221,9 +238,12 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
|
||||||
producer = given(outCapable = false, autoAck = true)
|
producer = given(outCapable = false, autoAck = true)
|
||||||
val doneSync = producer.processExchangeAdapter(exchange, asyncCallback)
|
val doneSync = producer.processExchangeAdapter(exchange, asyncCallback)
|
||||||
|
|
||||||
doneSync must be(true); info("done sync")
|
doneSync must be(true)
|
||||||
asyncCallback.expectDoneSyncWithin(1 second); info("async callback called")
|
info("done sync")
|
||||||
verify(exchange, never()).setResponse(any[CamelMessage]); info("no response forwarded to exchange")
|
asyncCallback.expectDoneSyncWithin(1 second)
|
||||||
|
info("async callback called")
|
||||||
|
verify(exchange, never()).setResponse(any[CamelMessage])
|
||||||
|
info("no response forwarded to exchange")
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -238,11 +258,14 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
|
||||||
|
|
||||||
doneSync must be(false)
|
doneSync must be(false)
|
||||||
within(1 second) {
|
within(1 second) {
|
||||||
probe.expectMsgType[CamelMessage]; info("message sent to consumer")
|
probe.expectMsgType[CamelMessage]
|
||||||
|
info("message sent to consumer")
|
||||||
probe.sender ! Ack
|
probe.sender ! Ack
|
||||||
asyncCallback.expectDoneAsyncWithin(remaining); info("async callback called")
|
asyncCallback.expectDoneAsyncWithin(remaining)
|
||||||
|
info("async callback called")
|
||||||
}
|
}
|
||||||
verify(exchange, never()).setResponse(any[CamelMessage]); info("no response forwarded to exchange")
|
verify(exchange, never()).setResponse(any[CamelMessage])
|
||||||
|
info("no response forwarded to exchange")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -253,12 +276,16 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
|
||||||
producer.processExchangeAdapter(exchange, asyncCallback)
|
producer.processExchangeAdapter(exchange, asyncCallback)
|
||||||
|
|
||||||
within(1 second) {
|
within(1 second) {
|
||||||
probe.expectMsgType[CamelMessage]; info("message sent to consumer")
|
probe.expectMsgType[CamelMessage]
|
||||||
|
info("message sent to consumer")
|
||||||
probe.sender ! "some neither Ack nor Failure response"
|
probe.sender ! "some neither Ack nor Failure response"
|
||||||
asyncCallback.expectDoneAsyncWithin(remaining); info("async callback called")
|
asyncCallback.expectDoneAsyncWithin(remaining)
|
||||||
|
info("async callback called")
|
||||||
}
|
}
|
||||||
verify(exchange, never()).setResponse(any[CamelMessage]); info("no response forwarded to exchange")
|
verify(exchange, never()).setResponse(any[CamelMessage])
|
||||||
verify(exchange).setFailure(any[FailureResult]); info("failure set")
|
info("no response forwarded to exchange")
|
||||||
|
verify(exchange).setFailure(any[FailureResult])
|
||||||
|
info("failure set")
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -282,12 +309,15 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
|
||||||
|
|
||||||
doneSync must be(false)
|
doneSync must be(false)
|
||||||
within(1 second) {
|
within(1 second) {
|
||||||
probe.expectMsgType[CamelMessage]; info("message sent to consumer")
|
probe.expectMsgType[CamelMessage]
|
||||||
|
info("message sent to consumer")
|
||||||
probe.sender ! Failure(new Exception)
|
probe.sender ! Failure(new Exception)
|
||||||
asyncCallback.awaitCalled(remaining);
|
asyncCallback.awaitCalled(remaining)
|
||||||
}
|
}
|
||||||
verify(exchange, never()).setResponse(any[CamelMessage]); info("no response forwarded to exchange")
|
verify(exchange, never()).setResponse(any[CamelMessage])
|
||||||
verify(exchange).setFailure(any[FailureResult]); info("failure set")
|
info("no response forwarded to exchange")
|
||||||
|
verify(exchange).setFailure(any[FailureResult])
|
||||||
|
info("failure set")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -363,10 +393,8 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo
|
||||||
def createAsyncCallback = new TestAsyncCallback
|
def createAsyncCallback = new TestAsyncCallback
|
||||||
|
|
||||||
class TestAsyncCallback extends AsyncCallback {
|
class TestAsyncCallback extends AsyncCallback {
|
||||||
def expectNoCallWithin(duration: Duration) {
|
def expectNoCallWithin(duration: Duration): Unit =
|
||||||
if (callbackReceived.await(duration.toNanos, TimeUnit.NANOSECONDS)) fail("NOT expected callback, but received one!")
|
if (callbackReceived.await(duration.length, duration.unit)) fail("NOT expected callback, but received one!")
|
||||||
}
|
|
||||||
|
|
||||||
def awaitCalled(timeout: Duration = 1 second) { valueWithin(1 second) }
|
def awaitCalled(timeout: Duration = 1 second) { valueWithin(1 second) }
|
||||||
|
|
||||||
val callbackReceived = new CountDownLatch(1)
|
val callbackReceived = new CountDownLatch(1)
|
||||||
|
|
@ -378,7 +406,7 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo
|
||||||
}
|
}
|
||||||
|
|
||||||
private[this] def valueWithin(implicit timeout: FiniteDuration) =
|
private[this] def valueWithin(implicit timeout: FiniteDuration) =
|
||||||
if (!callbackReceived.await(timeout.toNanos, TimeUnit.NANOSECONDS)) fail("Callback not received!")
|
if (!callbackReceived.await(timeout.length, timeout.unit)) fail("Callback not received!")
|
||||||
else callbackValue.get
|
else callbackValue.get
|
||||||
|
|
||||||
def expectDoneSyncWithin(implicit timeout: FiniteDuration): Unit = if (!valueWithin(timeout)) fail("Expected to be done Synchronously")
|
def expectDoneSyncWithin(implicit timeout: FiniteDuration): Unit = if (!valueWithin(timeout)) fail("Expected to be done Synchronously")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue