Code Scouting after review. See #2339
This commit is contained in:
parent
1e632e5a5c
commit
5e1c00b386
2 changed files with 23 additions and 45 deletions
|
|
@ -16,11 +16,9 @@ import org.apache.camel.builder.Builder
|
|||
import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException }
|
||||
import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException }
|
||||
import akka.testkit._
|
||||
import akka.actor.Status.Failure
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.util.duration._
|
||||
import akka.actor.Status.Failure
|
||||
import akka.actor.ActorKilledException
|
||||
|
||||
class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedCamelSystem {
|
||||
"ConsumerIntegrationTest" must {
|
||||
|
|
@ -68,7 +66,7 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
|
|||
def endpointUri = "direct:a2"
|
||||
|
||||
def receive = {
|
||||
case "throw" ⇒ throw new Exception
|
||||
case "throw" ⇒ throw new TestException("")
|
||||
case m: CamelMessage ⇒ sender ! "received " + m.bodyAs[String]
|
||||
}
|
||||
|
||||
|
|
@ -76,12 +74,11 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
|
|||
restarted.countDown()
|
||||
}
|
||||
})
|
||||
filterEvents(EventFilter[Exception](occurrences = 1)) {
|
||||
filterEvents(EventFilter[TestException](occurrences = 1)) {
|
||||
consumer ! "throw"
|
||||
Await.ready(restarted, defaultTimeout)
|
||||
|
||||
val response = camel.sendTo("direct:a2", msg = "xyz")
|
||||
response must be("received xyz")
|
||||
camel.sendTo("direct:a2", msg = "xyz") must be("received xyz")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -111,10 +108,10 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
|
|||
"Error passing consumer supports error handling through route modification" in {
|
||||
start(new ErrorThrowingConsumer("direct:error-handler-test") with ErrorPassing {
|
||||
override def onRouteDefinition(rd: RouteDefinition) = {
|
||||
rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end
|
||||
rd.onException(classOf[TestException]).handled(true).transform(Builder.exceptionMessage).end
|
||||
}
|
||||
})
|
||||
filterEvents(EventFilter[Exception](occurrences = 1)) {
|
||||
filterEvents(EventFilter[TestException](occurrences = 1)) {
|
||||
camel.sendTo("direct:error-handler-test", msg = "hello") must be("error: hello")
|
||||
}
|
||||
}
|
||||
|
|
@ -122,10 +119,10 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
|
|||
"Error passing consumer supports redelivery through route modification" in {
|
||||
start(new FailingOnceConsumer("direct:failing-once-concumer") with ErrorPassing {
|
||||
override def onRouteDefinition(rd: RouteDefinition) = {
|
||||
rd.onException(classOf[Exception]).maximumRedeliveries(1).end
|
||||
rd.onException(classOf[TestException]).maximumRedeliveries(1).end
|
||||
}
|
||||
})
|
||||
filterEvents(EventFilter[Exception](occurrences = 1)) {
|
||||
filterEvents(EventFilter[TestException](occurrences = 1)) {
|
||||
camel.sendTo("direct:failing-once-concumer", msg = "hello") must be("accepted: hello")
|
||||
}
|
||||
}
|
||||
|
|
@ -166,7 +163,7 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
|
|||
|
||||
class ErrorThrowingConsumer(override val endpointUri: String) extends Consumer {
|
||||
def receive = {
|
||||
case msg: CamelMessage ⇒ throw new Exception("error: %s" format msg.body)
|
||||
case msg: CamelMessage ⇒ throw new TestException("error: %s" format msg.body)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -177,7 +174,7 @@ class FailingOnceConsumer(override val endpointUri: String) extends Consumer {
|
|||
if (msg.headerAs[Boolean]("CamelRedelivered").getOrElse(false))
|
||||
sender ! ("accepted: %s" format msg.body)
|
||||
else
|
||||
throw new Exception("rejected: %s" format msg.body)
|
||||
throw new TestException("rejected: %s" format msg.body)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -196,3 +193,5 @@ trait ErrorPassing {
|
|||
trait ManualAckConsumer extends Consumer {
|
||||
override def autoAck = false
|
||||
}
|
||||
|
||||
class TestException(msg: String) extends Exception(msg)
|
||||
|
|
|
|||
|
|
@ -71,15 +71,9 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
|
|||
val producer = Await.result[ActorRef](supervisor.ask(Props(new TestProducer("direct:producer-test-2"))).mapTo[ActorRef], timeoutDuration)
|
||||
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
||||
val future = producer.ask(message)(timeoutDuration).failed
|
||||
|
||||
Await.ready(future, timeoutDuration).value match {
|
||||
case Some(Right(e: AkkaCamelException)) ⇒
|
||||
// a failure response must have been returned by the producer
|
||||
e.getMessage must be("failure")
|
||||
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123"))
|
||||
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
|
||||
}
|
||||
val e = intercept[AkkaCamelException] { Await.result(producer.ask(message)(timeoutDuration), timeoutDuration) }
|
||||
e.getMessage must be("failure")
|
||||
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123"))
|
||||
}
|
||||
Await.ready(latch, timeoutDuration)
|
||||
deadActor must be(Some(producer))
|
||||
|
|
@ -121,15 +115,9 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
|
|||
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||
|
||||
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
||||
val future = producer.ask(message)(timeoutDuration).failed
|
||||
|
||||
Await.ready(future, timeoutDuration).value match {
|
||||
case Some(Right(e: AkkaCamelException)) ⇒
|
||||
// a failure response must have been returned by the producer
|
||||
e.getMessage must be("failure")
|
||||
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123"))
|
||||
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
|
||||
}
|
||||
val e = intercept[AkkaCamelException] { Await.result(producer.ask(message)(timeoutDuration), timeoutDuration) }
|
||||
e.getMessage must be("failure")
|
||||
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123"))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -154,14 +142,9 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
|
|||
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||
|
||||
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
||||
val future = producer.ask(message)(timeoutDuration).failed
|
||||
Await.ready(future, timeoutDuration).value match {
|
||||
case Some(Right(e: AkkaCamelException)) ⇒
|
||||
// a failure response must have been returned by the forward target
|
||||
e.getMessage must be("failure")
|
||||
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
|
||||
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
|
||||
}
|
||||
val e = intercept[AkkaCamelException] { Await.result(producer.ask(message)(timeoutDuration), timeoutDuration) }
|
||||
e.getMessage must be("failure")
|
||||
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -204,13 +187,9 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
|
|||
|
||||
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||
filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
|
||||
val future = producer.ask(message)(timeoutDuration).failed
|
||||
Await.ready(future, timeoutDuration).value match {
|
||||
case Some(Right(e: AkkaCamelException)) ⇒
|
||||
e.getMessage must be("failure")
|
||||
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
|
||||
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
|
||||
}
|
||||
val e = intercept[AkkaCamelException] { Await.result(producer.ask(message)(timeoutDuration), timeoutDuration) }
|
||||
e.getMessage must be("failure")
|
||||
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue