Camel's non-blocking routing engine now fully supported

This commit is contained in:
Martin Krasser 2010-07-15 07:43:08 +02:00
parent 48aa70f03d
commit 5ba45bffcf
3 changed files with 41 additions and 175 deletions

View file

@ -6,8 +6,8 @@ package se.scalablesolutions.akka.camel
import CamelMessageConversion.toExchangeAdapter
import org.apache.camel.{Processor, ExchangePattern, Exchange, ProducerTemplate}
import org.apache.camel.impl.DefaultExchange
import org.apache.camel._
import org.apache.camel.processor.SendProcessor
import org.apache.camel.spi.Synchronization
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
@ -23,13 +23,9 @@ trait Producer { this: Actor =>
private val headersToCopyDefault = Set(Message.MessageExchangeId)
/**
* If set to true (default), communication with the Camel endpoint is done via the Camel
* <a href="http://camel.apache.org/async.html">Async API</a>. Camel then processes the
* message in a separate thread. If set to false, the actor thread is blocked until Camel
* has finished processing the produced message.
*/
def async: Boolean = true
private lazy val endpoint = CamelContextManager.context.getEndpoint(endpointUri)
private lazy val processor = createProcessor
/**
* If set to false (default), this producer expects a response message from the Camel endpoint.
@ -51,85 +47,38 @@ trait Producer { this: Actor =>
def headersToCopy: Set[String] = headersToCopyDefault
/**
* Returns the producer template from the CamelContextManager. Applications either have to ensure
* proper initialization of CamelContextManager or override this method.
*
* @see CamelContextManager.
* ...
*/
protected def template: ProducerTemplate = CamelContextManager.template
/**
* Initiates a one-way (in-only) message exchange to the Camel endpoint given by
* <code>endpointUri</code>. This method blocks until Camel finishes processing
* the message exchange.
*
* @param msg: the message to produce. The message is converted to its canonical
* representation via Message.canonicalize.
*/
protected def produceOnewaySync(msg: Any): Unit =
template.send(endpointUri, createInOnlyExchange.fromRequestMessage(Message.canonicalize(msg)))
/**
* Initiates a one-way (in-only) message exchange to the Camel endpoint given by
* <code>endpointUri</code>. This method triggers asynchronous processing of the
* message exchange by Camel.
*
* @param msg: the message to produce. The message is converted to its canonical
* representation via Message.canonicalize.
*/
protected def produceOnewayAsync(msg: Any): Unit =
template.asyncSend(
endpointUri, createInOnlyExchange.fromRequestMessage(Message.canonicalize(msg)))
/**
* Initiates a two-way (in-out) message exchange to the Camel endpoint given by
* <code>endpointUri</code>. This method blocks until Camel finishes processing
* the message exchange.
*
* @param msg: the message to produce. The message is converted to its canonical
* representation via Message.canonicalize.
* @return either a response Message or a Failure object.
*/
protected def produceSync(msg: Any): Any = {
val cmsg = Message.canonicalize(msg)
val requestProcessor = new Processor() {
def process(exchange: Exchange) = exchange.fromRequestMessage(cmsg)
}
val result = template.request(endpointUri, requestProcessor)
if (result.isFailed) result.toFailureMessage(cmsg.headers(headersToCopy))
else result.toResponseMessage(cmsg.headers(headersToCopy))
override def shutdown {
processor.stop
}
/**
* Initiates a two-way (in-out) message exchange to the Camel endpoint given by
* <code>endpointUri</code>. This method triggers asynchronous processing of the
* message exchange by Camel. The response message is returned asynchronously to
* the original sender (or sender future).
*
* @param msg: the message to produce. The message is converted to its canonical
* representation via Message.canonicalize.
* @return either a response Message or a Failure object.
* @see ProducerResponseSender
*/
protected def produceAsync(msg: Any): Unit = {
val cmsg = Message.canonicalize(msg)
val sync = new ProducerResponseSender(
cmsg.headers(headersToCopy), self.sender, self.senderFuture, this)
template.asyncCallback(endpointUri, createInOutExchange.fromRequestMessage(cmsg), sync)
protected def produceOneway(msg: Any): Unit = {
val exchange = createInOnlyExchange.fromRequestMessage(Message.canonicalize(msg))
processor.process(exchange, new AsyncCallback {
def done(doneSync: Boolean): Unit = { /* ignore because it's an in-only exchange */ }
})
}
protected def produceTwoway(msg: Any): Unit = {
val cmsg = Message.canonicalize(msg)
val exchange = createInOutExchange.fromRequestMessage(cmsg)
processor.process(exchange, new AsyncCallback {
def done(doneSync: Boolean): Unit = {
val response = if (exchange.isFailed)
exchange.toFailureMessage(cmsg.headers(headersToCopy))
else
exchange.toResponseMessage(cmsg.headers(headersToCopy))
self.reply(response)
}
})
}
/**
* Default implementation for Actor.receive. Implementors may choose to
* <code>def receive = produce</code>. This partial function calls one of
* the protected produce methods depending on the return values of
* <code>oneway</code> and <code>async</code>.
*/
protected def produce: Receive = {
case msg => {
if ( oneway && !async) produceOnewaySync(msg)
else if ( oneway && async) produceOnewayAsync(msg)
else if (!oneway && !async) self.reply(produceSync(msg))
else /*(!oneway && async)*/ produceAsync(msg)
case msg => if (oneway) {
produceOneway(msg)
} else {
val result = produceTwoway(msg)
}
}
@ -149,46 +98,17 @@ trait Producer { this: Actor =>
protected def createInOutExchange: Exchange = createExchange(ExchangePattern.InOut)
/**
* Creates a new Exchange with given pattern from the CamelContext managed by
* CamelContextManager. Applications either have to ensure proper initialization
* of CamelContextManager or override this method.
*
* @see CamelContextManager.
* ...
*/
protected def createExchange(pattern: ExchangePattern): Exchange =
new DefaultExchange(CamelContextManager.context, pattern)
}
/**
* Synchronization object that sends responses asynchronously to initial senders. This
* class is used by Producer for asynchronous two-way messaging with a Camel endpoint.
*
* @author Martin Krasser
*/
private[camel] class ProducerResponseSender(
headers: Map[String, Any],
sender: Option[ActorRef],
senderFuture: Option[CompletableFuture[Any]],
producer: Actor) extends Synchronization with Logging {
implicit val producerActor = Some(producer) // the response sender
protected def createExchange(pattern: ExchangePattern): Exchange = endpoint.createExchange(pattern)
/**
* Replies a Failure message, created from the given exchange, to <code>sender</code> (or
* <code>senderFuture</code> if applicable).
*
*/
def onFailure(exchange: Exchange) = reply(exchange.toFailureMessage(headers))
/**
* Replies a response Message, created from the given exchange, to <code>sender</code> (or
* <code>senderFuture</code> if applicable).
*/
def onComplete(exchange: Exchange) = reply(exchange.toResponseMessage(headers))
private def reply(message: Any) = {
if (senderFuture.isDefined) senderFuture.get completeWithResult message
else if (sender.isDefined) sender.get ! message
else log.warning("No destination for sending response")
private def createProcessor = {
val sendProcessor = new SendProcessor(endpoint)
sendProcessor.start
sendProcessor
}
}
@ -201,12 +121,3 @@ trait Oneway extends Producer { this: Actor =>
override def oneway = true
}
/**
* A synchronous producer.
*
* @author Martin Krasser
*/
trait Sync extends Producer { this: Actor =>
override def async = false
}

View file

@ -27,21 +27,7 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
feature("Produce a message to a Camel endpoint") {
scenario("produce message sync and receive response") {
given("a registered synchronous two-way producer for endpoint direct:producer-test-2")
val producer = actorOf(new TestProducer("direct:producer-test-2") with Sync)
producer.start
when("a test message is sent to the producer")
val message = Message("test", Map(Message.MessageExchangeId -> "123"))
val result = producer !! message
then("the expected result message should be returned including a correlation identifier")
val expected = Message("received test", Map(Message.MessageExchangeId -> "123"))
assert(result === Some(expected))
}
scenario("produce message async and receive response") {
scenario("produce message and receive response") {
given("a registered asynchronous two-way producer for endpoint direct:producer-test-2")
val producer = actorOf(new TestProducer("direct:producer-test-2"))
producer.start
@ -55,23 +41,7 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
assert(result === Some(expected))
}
scenario("produce message sync and receive failure") {
given("a registered synchronous two-way producer for endpoint direct:producer-test-2")
val producer = actorOf(new TestProducer("direct:producer-test-2") with Sync)
producer.start
when("a fail message is sent to the producer")
val message = Message("fail", Map(Message.MessageExchangeId -> "123"))
val result = (producer !! message).as[Failure]
then("the expected failure message should be returned including a correlation identifier")
val expectedFailureText = result.get.cause.getMessage
val expectedHeaders = result.get.headers
assert(expectedFailureText === "failure")
assert(expectedHeaders === Map(Message.MessageExchangeId -> "123"))
}
scenario("produce message async and receive failure") {
scenario("produce message and receive failure") {
given("a registered asynchronous two-way producer for endpoint direct:producer-test-2")
val producer = actorOf(new TestProducer("direct:producer-test-2"))
producer.start
@ -87,20 +57,7 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
assert(expectedHeaders === Map(Message.MessageExchangeId -> "123"))
}
scenario("produce message sync oneway") {
given("a registered synchronous one-way producer for endpoint direct:producer-test-1")
val producer = actorOf(new TestProducer("direct:producer-test-1") with Sync with Oneway)
producer.start
when("a test message is sent to the producer")
mockEndpoint.expectedBodiesReceived("test")
producer ! Message("test")
then("the expected message should have been sent to mock:mock")
mockEndpoint.assertIsSatisfied
}
scenario("produce message async oneway") {
scenario("produce message oneway") {
given("a registered asynchronous one-way producer for endpoint direct:producer-test-1")
val producer = actorOf(new TestProducer("direct:producer-test-1") with Oneway)
producer.start

View file

@ -28,9 +28,7 @@ class RemoteActor2 extends Actor with Consumer {
class Producer1 extends Actor with Producer {
def endpointUri = "direct:welcome"
override def oneway = false // default
override def async = true // default
}
class Consumer1 extends Actor with Consumer with Logging {