From 979cd371e61b4f130d85fd9ed47de43a1ca6721c Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Sat, 14 Aug 2010 11:31:15 +0200 Subject: [PATCH] Closes #392 Support untyped Java actors as endpoint producer --- akka-camel/src/main/scala/Consumer.scala | 16 +-- akka-camel/src/main/scala/Producer.scala | 83 ++++++++++++++-- .../camel/SampleRemoteUntypedConsumer.java | 4 +- .../akka/camel/SampleUntypedConsumer.java | 4 +- .../camel/SampleUntypedConsumerBlocking.java | 2 +- .../SampleUntypedForwardingProducer.java | 18 ++++ .../camel/SampleUntypedReplyingProducer.java | 12 +++ .../scala/UntypedProducerFeatureTest.scala | 98 +++++++++++++++++++ 8 files changed, 218 insertions(+), 19 deletions(-) create mode 100644 akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedForwardingProducer.java create mode 100644 akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedReplyingProducer.java create mode 100644 akka-camel/src/test/scala/UntypedProducerFeatureTest.scala diff --git a/akka-camel/src/main/scala/Consumer.scala b/akka-camel/src/main/scala/Consumer.scala index f0976d5ed5..3f593e1e36 100644 --- a/akka-camel/src/main/scala/Consumer.scala +++ b/akka-camel/src/main/scala/Consumer.scala @@ -30,14 +30,16 @@ trait Consumer { self: Actor => * Java-friendly {@link Consumer} inherited by * * * * implementations. + * + * @author Martin Krasser */ -trait JavaConsumer extends Consumer { self: Actor => +trait UntypedConsumer extends Consumer { self: UntypedActor => final override def endpointUri = getEndpointUri @@ -60,19 +62,19 @@ trait JavaConsumer extends Consumer { self: Actor => * Subclass this abstract class to create an MDB-style untyped consumer actor. This * class is meant to be used from Java. */ -abstract class UntypedConsumer extends UntypedActor with JavaConsumer +abstract class UntypedConsumerActor extends UntypedActor with UntypedConsumer /** * Subclass this abstract class to create an MDB-style transacted untyped consumer * actor. This class is meant to be used from Java. */ -abstract class TransactedUntypedConsumer extends UntypedTransactor with JavaConsumer +abstract class UntypedConsumerTransactor extends UntypedTransactor with UntypedConsumer /** * Subclass this abstract class to create an MDB-style remote untyped consumer * actor. This class is meant to be used from Java. */ -abstract class RemoteUntypedConsumer(address: InetSocketAddress) extends RemoteUntypedActor(address) with JavaConsumer { +abstract class RemoteUntypedConsumerActor(address: InetSocketAddress) extends RemoteUntypedActor(address) with UntypedConsumer { def this(host: String, port: Int) = this(new InetSocketAddress(host, port)) } diff --git a/akka-camel/src/main/scala/Producer.scala b/akka-camel/src/main/scala/Producer.scala index c49591ec7f..6f5c914a65 100644 --- a/akka-camel/src/main/scala/Producer.scala +++ b/akka-camel/src/main/scala/Producer.scala @@ -9,14 +9,14 @@ import CamelMessageConversion.toExchangeAdapter import org.apache.camel._ import org.apache.camel.processor.SendProcessor -import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import se.scalablesolutions.akka.actor.{Actor, ActorRef, UntypedActor} /** - * Mixed in by Actor implementations that produce messages to Camel endpoints. + * Support trait for producing messages to Camel endpoints. * * @author Martin Krasser */ -trait Producer { this: Actor => +trait ProducerSupport { this: Actor => /** * Message headers to copy by default from request message to response-message. @@ -141,11 +141,6 @@ trait Producer { this: Actor => case msg => if (!oneway) self.reply(msg) } - /** - * Default implementation of Actor.receive - */ - protected def receive = produce - /** * Creates a new Exchange with given pattern from the endpoint specified by * endpointUri. @@ -162,6 +157,78 @@ trait Producer { this: Actor => } } +/** + * Mixed in by Actor implementations that produce messages to Camel endpoints. + */ +trait Producer extends ProducerSupport { this: Actor => + + /** + * Default implementation of Actor.receive + */ + protected def receive = produce +} + +/** + * Java-friendly {@link ProducerSupport} inherited by {@link UntypedProducerActor} implementations. + * + * @author Martin Krasser + */ +trait UntypedProducer extends ProducerSupport { this: UntypedActor => + + final override def endpointUri = getEndpointUri + + final override def oneway = isOneway + + final override def receiveBeforeProduce = { + case msg => onReceiveBeforeProduce(msg) + } + + final override def receiveAfterProduce = { + case msg => onReceiveAfterProduce(msg) + } + + /** + * Default implementation of UntypedActor.onReceive + */ + def onReceive(message: Any) = produce(message) + + /** + * Returns the Camel endpoint URI to produce messages to. + */ + def getEndpointUri(): String + + /** + * If set to false (default), this producer expects a response message from the Camel endpoint. + * If set to true, this producer communicates with the Camel endpoint with an in-only message + * exchange pattern (fire and forget). + */ + def isOneway() = super.oneway + + /** + * Called before the message is sent to the endpoint specified by getEndpointUri. The original + * message is passed as argument. By default, this method simply returns the argument but may be overridden + * by subclasses. + */ + @throws(classOf[Exception]) + def onReceiveBeforeProduce(message: Any): Any = super.receiveBeforeProduce(message) + + /** + * Called after the a result was received from the endpoint specified by getEndpointUri. The + * result is passed as argument. By default, this method replies the result back to the original sender + * if isOneway returns false. If isOneway returns true then nothing is done. This + * method may be overridden by subclasses. + */ + @throws(classOf[Exception]) + def onReceiveAfterProduce(message: Any): Unit = super.receiveAfterProduce(message) +} + +/** + * Subclass this abstract class to create an untyped producer actor. This class is meant to be used from Java. + * + * @author Martin Krasser + */ +abstract class UntypedProducerActor extends UntypedActor with UntypedProducer + /** * @author Martin Krasser */ diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleRemoteUntypedConsumer.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleRemoteUntypedConsumer.java index d19ca123a1..c35bd92b71 100644 --- a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleRemoteUntypedConsumer.java +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleRemoteUntypedConsumer.java @@ -1,11 +1,11 @@ package se.scalablesolutions.akka.camel; -import java.net.InetSocketAddress; +import se.scalablesolutions.akka.camel.RemoteUntypedConsumerActor; /** * @author Martin Krasser */ -public class SampleRemoteUntypedConsumer extends RemoteUntypedConsumer { +public class SampleRemoteUntypedConsumer extends RemoteUntypedConsumerActor { public SampleRemoteUntypedConsumer() { this("localhost", 7774); diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumer.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumer.java index 248882d5b6..303b4302f3 100644 --- a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumer.java +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumer.java @@ -1,9 +1,11 @@ package se.scalablesolutions.akka.camel; +import se.scalablesolutions.akka.camel.UntypedConsumerActor; + /** * @author Martin Krasser */ -public class SampleUntypedConsumer extends UntypedConsumer { +public class SampleUntypedConsumer extends UntypedConsumerActor { public String getEndpointUri() { return "direct:test-untyped-consumer"; diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumerBlocking.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumerBlocking.java index 80cbabdf89..c653d421bc 100644 --- a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumerBlocking.java +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumerBlocking.java @@ -3,7 +3,7 @@ package se.scalablesolutions.akka.camel; /** * @author Martin Krasser */ -public class SampleUntypedConsumerBlocking extends UntypedConsumer { +public class SampleUntypedConsumerBlocking extends UntypedConsumerActor { public String getEndpointUri() { return "direct:test-untyped-consumer-blocking"; diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedForwardingProducer.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedForwardingProducer.java new file mode 100644 index 0000000000..e909947de8 --- /dev/null +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedForwardingProducer.java @@ -0,0 +1,18 @@ +package se.scalablesolutions.akka.camel; + +/** + * @author Martin Krasser + */ +public class SampleUntypedForwardingProducer extends UntypedProducerActor { + + public String getEndpointUri() { + return "direct:producer-test-1"; + } + + @Override + public void onReceiveAfterProduce(Object message) { + Message msg = (Message)message; + String body = msg.bodyAs(String.class); + CamelContextManager.template().sendBody("direct:forward-test-1", body); + } +} diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedReplyingProducer.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedReplyingProducer.java new file mode 100644 index 0000000000..cc3fbf110d --- /dev/null +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedReplyingProducer.java @@ -0,0 +1,12 @@ +package se.scalablesolutions.akka.camel; + +/** + * @author Martin Krasser + */ +public class SampleUntypedReplyingProducer extends UntypedProducerActor { + + public String getEndpointUri() { + return "direct:producer-test-1"; + } + +} diff --git a/akka-camel/src/test/scala/UntypedProducerFeatureTest.scala b/akka-camel/src/test/scala/UntypedProducerFeatureTest.scala new file mode 100644 index 0000000000..c8a0bd8542 --- /dev/null +++ b/akka-camel/src/test/scala/UntypedProducerFeatureTest.scala @@ -0,0 +1,98 @@ +package se.scalablesolutions.akka.camel + +import org.apache.camel.{Exchange, Processor} +import org.apache.camel.builder.RouteBuilder +import org.apache.camel.component.mock.MockEndpoint +import org.scalatest.{GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec} + +import se.scalablesolutions.akka.actor.UntypedActor._ +import se.scalablesolutions.akka.actor.ActorRegistry + +class UntypedProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach with GivenWhenThen { + import UntypedProducerFeatureTest._ + + override protected def beforeAll = { + ActorRegistry.shutdownAll + CamelContextManager.init + CamelContextManager.context.addRoutes(new TestRoute) + CamelContextManager.start + } + + override protected def afterAll = { + CamelContextManager.stop + ActorRegistry.shutdownAll + } + + override protected def afterEach = { + mockEndpoint.reset + } + + feature("Produce a message to a sync Camel route") { + + scenario("produce message and receive normal response") { + given("a registered two-way producer") + val producer = actorOf(classOf[SampleUntypedReplyingProducer]) + producer.start + + when("a test message is sent to the producer with !!") + val message = Message("test", Map(Message.MessageExchangeId -> "123")) + val result = producer.sendRequestReply(message) + + then("a normal response should have been returned by the producer") + val expected = Message("received test", Map(Message.MessageExchangeId -> "123")) + assert(result === expected) + } + + scenario("produce message and receive failure response") { + given("a registered two-way producer") + val producer = actorOf(classOf[SampleUntypedReplyingProducer]) + producer.start + + when("a test message causing an exception is sent to the producer with !!") + val message = Message("fail", Map(Message.MessageExchangeId -> "123")) + val result = producer.sendRequestReply(message).asInstanceOf[Failure] + + then("a failure response should have been returned by the producer") + val expectedFailureText = result.cause.getMessage + val expectedHeaders = result.headers + assert(expectedFailureText === "failure") + assert(expectedHeaders === Map(Message.MessageExchangeId -> "123")) + } + + } + + feature("Produce a message to a sync Camel route and then forward the response") { + + scenario("produce message and send normal response to direct:forward-test-1") { + given("a registered one-way producer configured with a forward target") + val producer = actorOf(classOf[SampleUntypedForwardingProducer]) + producer.start + + when("a test message is sent to the producer with !") + mockEndpoint.expectedBodiesReceived("received test") + val result = producer.sendOneWay(Message("test"), producer) + + then("a normal response should have been sent") + mockEndpoint.assertIsSatisfied + } + + } + + private def mockEndpoint = CamelContextManager.context.getEndpoint("mock:mock", classOf[MockEndpoint]) +} + +object UntypedProducerFeatureTest { + class TestRoute extends RouteBuilder { + def configure { + from("direct:forward-test-1").to("mock:mock") + from("direct:producer-test-1").process(new Processor() { + def process(exchange: Exchange) = { + exchange.getIn.getBody match { + case "fail" => throw new Exception("failure") + case body => exchange.getOut.setBody("received %s" format body) + } + } + }) + } + } +}