This commit is contained in:
parent
f63bd41d22
commit
d7926fb6c6
2 changed files with 112 additions and 18 deletions
|
|
@ -41,6 +41,8 @@ trait Producer { this: Actor =>
|
|||
*/
|
||||
def oneway: Boolean = false
|
||||
|
||||
def forwardResultTo: Option[ActorRef] = None
|
||||
|
||||
/**
|
||||
* Returns the Camel endpoint URI to produce messages to.
|
||||
*/
|
||||
|
|
@ -91,10 +93,27 @@ trait Producer { this: Actor =>
|
|||
val senderFuture = self.senderFuture
|
||||
|
||||
def done(doneSync: Boolean): Unit = {
|
||||
val response =
|
||||
if (exchange.isFailed) exchange.toFailureMessage(cmsg.headers(headersToCopy))
|
||||
else exchange.toResponseMessage(cmsg.headers(headersToCopy))
|
||||
val response = if (exchange.isFailed)
|
||||
exchange.toFailureMessage(cmsg.headers(headersToCopy))
|
||||
else
|
||||
exchange.toResponseMessage(cmsg.headers(headersToCopy))
|
||||
|
||||
if (forwardResultTo.isDefined)
|
||||
forward(response, forwardResultTo.get)
|
||||
else
|
||||
reply(response)
|
||||
}
|
||||
|
||||
private def forward(response: Any, target: ActorRef) = {
|
||||
// TODO: avoid redundancy to ActorRef.forward
|
||||
if (target.isRunning) {
|
||||
if (senderFuture.isDefined) target.postMessageToMailboxAndCreateFutureResultWithTimeout(response, target.timeout, sender, senderFuture)
|
||||
else target.postMessageToMailbox(response, sender) // initial sender doesn't need be an actor
|
||||
}
|
||||
}
|
||||
|
||||
private def reply(response: Any) = {
|
||||
// TODO: avoid redundancy to ActorRef.reply
|
||||
if (senderFuture.isDefined) senderFuture.get completeWithResult response
|
||||
else if (sender.isDefined) sender.get ! response
|
||||
else log.warning("No destination for sending response")
|
||||
|
|
|
|||
|
|
@ -5,8 +5,8 @@ import org.apache.camel.builder.RouteBuilder
|
|||
import org.apache.camel.component.mock.MockEndpoint
|
||||
import org.scalatest.{GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec}
|
||||
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRegistry}
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import se.scalablesolutions.akka.actor.{ActorRef, Actor, ActorRegistry}
|
||||
|
||||
class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach with GivenWhenThen {
|
||||
import ProducerFeatureTest._
|
||||
|
|
@ -27,30 +27,30 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
|
|||
|
||||
feature("Produce a message to a Camel endpoint") {
|
||||
|
||||
scenario("produce message and receive response") {
|
||||
given("a registered asynchronous two-way producer for endpoint direct:producer-test-2")
|
||||
scenario("produce message and receive normal response") {
|
||||
given("a registered two-way producer")
|
||||
val producer = actorOf(new TestProducer("direct:producer-test-2"))
|
||||
producer.start
|
||||
|
||||
when("a test message is sent to the producer")
|
||||
when("a test message is sent to the producer with !!")
|
||||
val message = Message("test", Map(Message.MessageExchangeId -> "123"))
|
||||
val result = producer !! message
|
||||
|
||||
then("the expected result message should be returned including a correlation identifier")
|
||||
then("a normal response should have been returned by the producer")
|
||||
val expected = Message("received test", Map(Message.MessageExchangeId -> "123"))
|
||||
assert(result === Some(expected))
|
||||
}
|
||||
|
||||
scenario("produce message and receive failure") {
|
||||
given("a registered asynchronous two-way producer for endpoint direct:producer-test-2")
|
||||
scenario("produce message and receive failure response") {
|
||||
given("a registered two-way producer")
|
||||
val producer = actorOf(new TestProducer("direct:producer-test-2"))
|
||||
producer.start
|
||||
|
||||
when("a fail message is sent to the producer")
|
||||
when("a test message causing an exception is sent to the producer with !!")
|
||||
val message = Message("fail", Map(Message.MessageExchangeId -> "123"))
|
||||
val result = (producer !! message).as[Failure]
|
||||
|
||||
then("the expected failure message should be returned including a correlation identifier")
|
||||
then("a failure response should have been returned by the producer")
|
||||
val expectedFailureText = result.get.cause.getMessage
|
||||
val expectedHeaders = result.get.headers
|
||||
assert(expectedFailureText === "failure")
|
||||
|
|
@ -58,24 +58,24 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
|
|||
}
|
||||
|
||||
scenario("produce message oneway") {
|
||||
given("a registered asynchronous one-way producer for endpoint direct:producer-test-1")
|
||||
given("a registered one-way producer")
|
||||
val producer = actorOf(new TestProducer("direct:producer-test-1") with Oneway)
|
||||
producer.start
|
||||
|
||||
when("a test message is sent to the producer")
|
||||
when("a test message is sent to the producer with !")
|
||||
mockEndpoint.expectedBodiesReceived("test")
|
||||
producer ! Message("test")
|
||||
|
||||
then("the expected message should have been sent to mock:mock")
|
||||
then("the test message should have been sent to mock:mock")
|
||||
mockEndpoint.assertIsSatisfied
|
||||
}
|
||||
|
||||
scenario("produce message twoway without sender reference") {
|
||||
given("a registered asynchronous two-way producer for endpoint direct:producer-test-1")
|
||||
given("a registered two-way producer")
|
||||
val producer = actorOf(new TestProducer("direct:producer-test-1"))
|
||||
producer.start
|
||||
|
||||
when("a test message is sent to the producer")
|
||||
when("a test message is sent to the producer with !")
|
||||
mockEndpoint.expectedBodiesReceived("test")
|
||||
producer ! Message("test")
|
||||
|
||||
|
|
@ -84,16 +84,91 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
|
|||
}
|
||||
}
|
||||
|
||||
feature("Produce a message to a Camel endpoint and then forward the result") {
|
||||
|
||||
scenario("produce message, forward and receive normal response") {
|
||||
given("a registered two-way producer configured with a forward target")
|
||||
val responder = actorOf[ReplyingForwardTarget].start
|
||||
val producer = actorOf(new TestProducer("direct:producer-test-2", Some(responder))).start
|
||||
|
||||
when("a test message is sent to the producer with !!")
|
||||
val message = Message("test", Map(Message.MessageExchangeId -> "123"))
|
||||
val result = producer !! message
|
||||
|
||||
then("a normal response should have been returned by the forward target")
|
||||
val expected = Message("received test", Map(Message.MessageExchangeId -> "123", "test" -> "result"))
|
||||
assert(result === Some(expected))
|
||||
}
|
||||
|
||||
scenario("produce message, forward and receive failure response") {
|
||||
given("a registered two-way producer configured with a forward target")
|
||||
val responder = actorOf[ReplyingForwardTarget].start
|
||||
val producer = actorOf(new TestProducer("direct:producer-test-2", Some(responder))).start
|
||||
|
||||
when("a test message causing an exception is sent to the producer with !!")
|
||||
val message = Message("fail", Map(Message.MessageExchangeId -> "123"))
|
||||
val result = (producer !! message).as[Failure]
|
||||
|
||||
then("a failure response should have been returned by the forward target")
|
||||
val expectedFailureText = result.get.cause.getMessage
|
||||
val expectedHeaders = result.get.headers
|
||||
assert(expectedFailureText === "failure")
|
||||
assert(expectedHeaders === Map(Message.MessageExchangeId -> "123", "test" -> "failure"))
|
||||
}
|
||||
|
||||
scenario("produce message, forward and produce normal response") {
|
||||
given("a registered one-way producer configured with a forward target")
|
||||
val responder = actorOf[ProducingForwardTarget].start
|
||||
val producer = actorOf(new TestProducer("direct:producer-test-2", Some(responder))).start
|
||||
|
||||
when("a test message is sent to the producer with !")
|
||||
mockEndpoint.expectedBodiesReceived("received test")
|
||||
val result = producer ! Message("test")
|
||||
|
||||
then("a normal response should have been produced by the forward target")
|
||||
mockEndpoint.assertIsSatisfied
|
||||
}
|
||||
|
||||
scenario("produce message, forward and produce failure response") {
|
||||
given("a registered one-way producer configured with a forward target")
|
||||
val responder = actorOf[ProducingForwardTarget].start
|
||||
val producer = actorOf(new TestProducer("direct:producer-test-2", Some(responder))).start
|
||||
|
||||
when("a test message causing an exception is sent to the producer with !")
|
||||
mockEndpoint.expectedMessageCount(1)
|
||||
mockEndpoint.message(0).body().isInstanceOf(classOf[Failure])
|
||||
val result = producer ! Message("fail")
|
||||
|
||||
then("a failure response should have been produced by the forward target")
|
||||
mockEndpoint.assertIsSatisfied
|
||||
}
|
||||
}
|
||||
|
||||
private def mockEndpoint = CamelContextManager.context.getEndpoint("mock:mock", classOf[MockEndpoint])
|
||||
}
|
||||
|
||||
object ProducerFeatureTest {
|
||||
class TestProducer(uri: String) extends Actor with Producer {
|
||||
class TestProducer(uri: String, target: Option[ActorRef] = None) extends Actor with Producer {
|
||||
def endpointUri = uri
|
||||
override def forwardResultTo = target
|
||||
}
|
||||
|
||||
class ReplyingForwardTarget extends Actor {
|
||||
protected def receive = {
|
||||
case msg: Message =>
|
||||
self.reply(msg.addHeader("test" -> "result"))
|
||||
case msg: Failure =>
|
||||
self.reply(Failure(msg.cause, msg.headers + ("test" -> "failure")))
|
||||
}
|
||||
}
|
||||
|
||||
class ProducingForwardTarget extends Actor with Producer with Oneway {
|
||||
def endpointUri = "direct:forward-test-1"
|
||||
}
|
||||
|
||||
class TestRoute extends RouteBuilder {
|
||||
def configure {
|
||||
from("direct:forward-test-1").to("mock:mock")
|
||||
// for one-way messaging tests
|
||||
from("direct:producer-test-1").to("mock:mock")
|
||||
// for two-way messaging tests
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue