CAMEL: Used endpoint replyTimeout in processExchangeAdapter, fixed ActorProducerTest which did not wait for completion
This commit is contained in:
parent
6d8d4b4ec9
commit
ecf970dd4d
2 changed files with 14 additions and 4 deletions
|
|
@ -135,7 +135,7 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
|
|||
private[camel] def processExchangeAdapter(exchange: CamelExchangeAdapter): Unit = {
|
||||
val isDone = new CountDownLatch(1)
|
||||
processExchangeAdapter(exchange, new AsyncCallback { def done(doneSync: Boolean) { isDone.countDown() } })
|
||||
isDone.await(camel.settings.ReplyTimeout.toMillis, TimeUnit.MILLISECONDS)
|
||||
isDone.await(endpoint.replyTimeout.toMillis, TimeUnit.MILLISECONDS)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -24,11 +24,12 @@ import akka.actor.Status.{ Success, Failure }
|
|||
import com.typesafe.config.ConfigFactory
|
||||
import akka.actor.ActorSystem.Settings
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.testkit.{ TimingTest, TestKit, TestProbe }
|
||||
import akka.testkit.{ TestLatch, TimingTest, TestKit, TestProbe }
|
||||
import org.apache.camel.impl.DefaultCamelContext
|
||||
import concurrent.{ Await, Promise, Future }
|
||||
import akka.util.Timeout
|
||||
import akka.actor._
|
||||
import akka.testkit._
|
||||
|
||||
class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with ActorProducerFixture {
|
||||
implicit val timeout = Timeout(10 seconds)
|
||||
|
|
@ -111,10 +112,18 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
|
|||
}
|
||||
|
||||
"response is not sent by actor" must {
|
||||
|
||||
val latch = TestLatch(1)
|
||||
val callback = new AsyncCallback {
|
||||
def done(doneSync: Boolean) {
|
||||
latch.countDown()
|
||||
}
|
||||
}
|
||||
def process() = {
|
||||
producer = given(outCapable = true, replyTimeout = 100 millis)
|
||||
time(producer.processExchangeAdapter(exchange))
|
||||
val duration = time(producer.processExchangeAdapter(exchange, callback))
|
||||
Await.ready(latch, 1.seconds.dilated)
|
||||
latch.reset
|
||||
duration
|
||||
}
|
||||
|
||||
"timeout after replyTimeout" taggedAs TimingTest in {
|
||||
|
|
@ -129,6 +138,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
|
|||
|
||||
"set failure message to timeout" in {
|
||||
process()
|
||||
|
||||
verify(exchange).setFailure(any[FailureResult])
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue