Processed review wip-camel pull request 344

This commit is contained in:
RayRoestenburg 2012-03-01 17:32:10 +01:00
parent 4d6511c5c6
commit f74616f828
56 changed files with 544 additions and 1992 deletions

View file

@ -7,29 +7,30 @@ package akka.camel
import akka.actor._
import org.scalatest.matchers.MustMatchers
import akka.util.duration._
import java.util.concurrent.TimeUnit._
import TestSupport._
import org.scalatest.WordSpec
import org.apache.camel.model.RouteDefinition
import org.apache.camel.builder.Builder
import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException }
import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException, CountDownLatch }
import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException }
import akka.testkit.TestLatch
import akka.dispatch.Await
class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedCamelSystem {
private val defaultTimeout = 10
"Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in {
val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri")))
intercept[FailedToCreateRouteException] {
camel.awaitActivation(actorRef, timeout = 1 second)
camel.awaitActivation(actorRef, timeout = defaultTimeout seconds)
}
}
"Consumer must support in-out messaging" in {
start(new Consumer {
def endpointUri = "direct:a1"
protected def receive = {
case m: Message sender ! "received " + m.bodyAs[String]
def receive = {
case m: CamelMessage sender ! "received " + m.bodyAs[String]
}
})
camel.sendTo("direct:a1", msg = "some message") must be("received some message")
@ -43,7 +44,7 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
override def replyTimeout = SHORT_TIMEOUT
def endpointUri = "direct:a3"
protected def receive = { case _ { Thread.sleep(LONG_WAIT.toMillis); sender ! "done" } }
def receive = { case _ { Thread.sleep(LONG_WAIT.toMillis); sender ! "done" } }
})
val exception = intercept[CamelExecutionException] {
@ -53,13 +54,13 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
}
"Consumer must process messages even after actor restart" in {
val restarted = new CountDownLatch(1)
val restarted = TestLatch()
val consumer = start(new Consumer {
def endpointUri = "direct:a2"
protected def receive = {
case "throw" throw new Exception
case m: Message sender ! "received " + m.bodyAs[String]
def receive = {
case "throw" throw new Exception
case m: CamelMessage sender ! "received " + m.bodyAs[String]
}
override def postRestart(reason: Throwable) {
@ -67,7 +68,7 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
}
})
consumer ! "throw"
if (!restarted.await(1, SECONDS)) fail("Actor failed to restart!")
Await.ready(restarted, defaultTimeout seconds)
val response = camel.sendTo("direct:a2", msg = "xyz")
response must be("received xyz")
@ -75,12 +76,12 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
"Consumer must unregister itself when stopped" in {
val consumer = start(new TestActor())
camel.awaitActivation(consumer, 1 second)
camel.awaitActivation(consumer, defaultTimeout seconds)
camel.routeCount must be > (0)
system.stop(consumer)
camel.awaitDeactivation(consumer, 1 second)
camel.awaitDeactivation(consumer, defaultTimeout seconds)
camel.routeCount must be(0)
}
@ -106,20 +107,20 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
"Consumer supports manual Ack" in {
start(new ManualAckConsumer() {
def endpointUri = "direct:manual-ack"
protected def receive = { case _ sender ! Ack }
def receive = { case _ sender ! Ack }
})
camel.template.asyncSendBody("direct:manual-ack", "some message").get(1, TimeUnit.SECONDS) must be(null) //should not timeout
camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout, TimeUnit.SECONDS) must be(null) //should not timeout
}
"Consumer handles manual Ack failure" in {
val someException = new Exception("e1")
start(new ManualAckConsumer() {
def endpointUri = "direct:manual-ack"
protected def receive = { case _ sender ! Failure(someException) }
def receive = { case _ sender ! Failure(someException) }
})
intercept[ExecutionException] {
camel.template.asyncSendBody("direct:manual-ack", "some message").get(1, TimeUnit.SECONDS)
camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout, TimeUnit.SECONDS)
}.getCause.getCause must be(someException)
}
@ -127,25 +128,25 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
start(new ManualAckConsumer() {
override def replyTimeout = 10 millis
def endpointUri = "direct:manual-ack"
protected def receive = { case _ }
def receive = { case _ }
})
intercept[ExecutionException] {
camel.template.asyncSendBody("direct:manual-ack", "some message").get(1, TimeUnit.SECONDS)
camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout, TimeUnit.SECONDS)
}.getCause.getCause.getMessage must include("Failed to get Ack")
}
}
class ErrorThrowingConsumer(override val endpointUri: String) extends Consumer {
def receive = {
case msg: Message throw new Exception("error: %s" format msg.body)
case msg: CamelMessage throw new Exception("error: %s" format msg.body)
}
}
class FailingOnceConsumer(override val endpointUri: String) extends Consumer {
def receive = {
case msg: Message
case msg: CamelMessage
if (msg.headerAs[Boolean]("CamelRedelivered").getOrElse(false))
sender ! ("accepted: %s" format msg.body)
else
@ -155,5 +156,16 @@ class FailingOnceConsumer(override val endpointUri: String) extends Consumer {
class TestActor(uri: String = "file://target/abcde") extends Consumer {
def endpointUri = uri
protected def receive = { case _ /* do nothing */ }
def receive = { case _ /* do nothing */ }
}
trait ErrorPassing {
self: Actor
final override def preRestart(reason: Throwable, message: Option[Any]) {
sender ! Failure(reason)
}
}
trait ManualAckConsumer extends Consumer {
override def autoack = false
}