diff --git a/akka-camel/src/main/scala/Consumer.scala b/akka-camel/src/main/scala/Consumer.scala index f0976d5ed5..3f593e1e36 100644 --- a/akka-camel/src/main/scala/Consumer.scala +++ b/akka-camel/src/main/scala/Consumer.scala @@ -30,14 +30,16 @@ trait Consumer { self: Actor => * Java-friendly {@link Consumer} inherited by * *
pattern from the endpoint specified by
* endpointUri.
@@ -162,6 +157,78 @@ trait Producer { this: Actor =>
}
}
+/**
+ * Mixed in by Actor implementations that produce messages to Camel endpoints.
+ */
+trait Producer extends ProducerSupport { this: Actor =>
+
+ /**
+ * Default implementation of Actor.receive
+ */
+ protected def receive = produce
+}
+
+/**
+ * Java-friendly {@link ProducerSupport} inherited by {@link UntypedProducerActor} implementations.
+ *
+ * @author Martin Krasser
+ */
+trait UntypedProducer extends ProducerSupport { this: UntypedActor =>
+
+ final override def endpointUri = getEndpointUri
+
+ final override def oneway = isOneway
+
+ final override def receiveBeforeProduce = {
+ case msg => onReceiveBeforeProduce(msg)
+ }
+
+ final override def receiveAfterProduce = {
+ case msg => onReceiveAfterProduce(msg)
+ }
+
+ /**
+ * Default implementation of UntypedActor.onReceive
+ */
+ def onReceive(message: Any) = produce(message)
+
+ /**
+ * Returns the Camel endpoint URI to produce messages to.
+ */
+ def getEndpointUri(): String
+
+ /**
+ * If set to false (default), this producer expects a response message from the Camel endpoint.
+ * If set to true, this producer communicates with the Camel endpoint with an in-only message
+ * exchange pattern (fire and forget).
+ */
+ def isOneway() = super.oneway
+
+ /**
+ * Called before the message is sent to the endpoint specified by getEndpointUri. The original
+ * message is passed as argument. By default, this method simply returns the argument but may be overridden
+ * by subclasses.
+ */
+ @throws(classOf[Exception])
+ def onReceiveBeforeProduce(message: Any): Any = super.receiveBeforeProduce(message)
+
+ /**
+ * Called after the a result was received from the endpoint specified by getEndpointUri. The
+ * result is passed as argument. By default, this method replies the result back to the original sender
+ * if isOneway returns false. If isOneway returns true then nothing is done. This
+ * method may be overridden by subclasses.
+ */
+ @throws(classOf[Exception])
+ def onReceiveAfterProduce(message: Any): Unit = super.receiveAfterProduce(message)
+}
+
+/**
+ * Subclass this abstract class to create an untyped producer actor. This class is meant to be used from Java.
+ *
+ * @author Martin Krasser
+ */
+abstract class UntypedProducerActor extends UntypedActor with UntypedProducer
+
/**
* @author Martin Krasser
*/
diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleRemoteUntypedConsumer.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleRemoteUntypedConsumer.java
index d19ca123a1..c35bd92b71 100644
--- a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleRemoteUntypedConsumer.java
+++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleRemoteUntypedConsumer.java
@@ -1,11 +1,11 @@
package se.scalablesolutions.akka.camel;
-import java.net.InetSocketAddress;
+import se.scalablesolutions.akka.camel.RemoteUntypedConsumerActor;
/**
* @author Martin Krasser
*/
-public class SampleRemoteUntypedConsumer extends RemoteUntypedConsumer {
+public class SampleRemoteUntypedConsumer extends RemoteUntypedConsumerActor {
public SampleRemoteUntypedConsumer() {
this("localhost", 7774);
diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumer.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumer.java
index 248882d5b6..303b4302f3 100644
--- a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumer.java
+++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumer.java
@@ -1,9 +1,11 @@
package se.scalablesolutions.akka.camel;
+import se.scalablesolutions.akka.camel.UntypedConsumerActor;
+
/**
* @author Martin Krasser
*/
-public class SampleUntypedConsumer extends UntypedConsumer {
+public class SampleUntypedConsumer extends UntypedConsumerActor {
public String getEndpointUri() {
return "direct:test-untyped-consumer";
diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumerBlocking.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumerBlocking.java
index 80cbabdf89..c653d421bc 100644
--- a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumerBlocking.java
+++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumerBlocking.java
@@ -3,7 +3,7 @@ package se.scalablesolutions.akka.camel;
/**
* @author Martin Krasser
*/
-public class SampleUntypedConsumerBlocking extends UntypedConsumer {
+public class SampleUntypedConsumerBlocking extends UntypedConsumerActor {
public String getEndpointUri() {
return "direct:test-untyped-consumer-blocking";
diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedForwardingProducer.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedForwardingProducer.java
new file mode 100644
index 0000000000..e909947de8
--- /dev/null
+++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedForwardingProducer.java
@@ -0,0 +1,18 @@
+package se.scalablesolutions.akka.camel;
+
+/**
+ * @author Martin Krasser
+ */
+public class SampleUntypedForwardingProducer extends UntypedProducerActor {
+
+ public String getEndpointUri() {
+ return "direct:producer-test-1";
+ }
+
+ @Override
+ public void onReceiveAfterProduce(Object message) {
+ Message msg = (Message)message;
+ String body = msg.bodyAs(String.class);
+ CamelContextManager.template().sendBody("direct:forward-test-1", body);
+ }
+}
diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedReplyingProducer.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedReplyingProducer.java
new file mode 100644
index 0000000000..cc3fbf110d
--- /dev/null
+++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedReplyingProducer.java
@@ -0,0 +1,12 @@
+package se.scalablesolutions.akka.camel;
+
+/**
+ * @author Martin Krasser
+ */
+public class SampleUntypedReplyingProducer extends UntypedProducerActor {
+
+ public String getEndpointUri() {
+ return "direct:producer-test-1";
+ }
+
+}
diff --git a/akka-camel/src/test/scala/UntypedProducerFeatureTest.scala b/akka-camel/src/test/scala/UntypedProducerFeatureTest.scala
new file mode 100644
index 0000000000..c8a0bd8542
--- /dev/null
+++ b/akka-camel/src/test/scala/UntypedProducerFeatureTest.scala
@@ -0,0 +1,98 @@
+package se.scalablesolutions.akka.camel
+
+import org.apache.camel.{Exchange, Processor}
+import org.apache.camel.builder.RouteBuilder
+import org.apache.camel.component.mock.MockEndpoint
+import org.scalatest.{GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec}
+
+import se.scalablesolutions.akka.actor.UntypedActor._
+import se.scalablesolutions.akka.actor.ActorRegistry
+
+class UntypedProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach with GivenWhenThen {
+ import UntypedProducerFeatureTest._
+
+ override protected def beforeAll = {
+ ActorRegistry.shutdownAll
+ CamelContextManager.init
+ CamelContextManager.context.addRoutes(new TestRoute)
+ CamelContextManager.start
+ }
+
+ override protected def afterAll = {
+ CamelContextManager.stop
+ ActorRegistry.shutdownAll
+ }
+
+ override protected def afterEach = {
+ mockEndpoint.reset
+ }
+
+ feature("Produce a message to a sync Camel route") {
+
+ scenario("produce message and receive normal response") {
+ given("a registered two-way producer")
+ val producer = actorOf(classOf[SampleUntypedReplyingProducer])
+ producer.start
+
+ when("a test message is sent to the producer with !!")
+ val message = Message("test", Map(Message.MessageExchangeId -> "123"))
+ val result = producer.sendRequestReply(message)
+
+ then("a normal response should have been returned by the producer")
+ val expected = Message("received test", Map(Message.MessageExchangeId -> "123"))
+ assert(result === expected)
+ }
+
+ scenario("produce message and receive failure response") {
+ given("a registered two-way producer")
+ val producer = actorOf(classOf[SampleUntypedReplyingProducer])
+ producer.start
+
+ when("a test message causing an exception is sent to the producer with !!")
+ val message = Message("fail", Map(Message.MessageExchangeId -> "123"))
+ val result = producer.sendRequestReply(message).asInstanceOf[Failure]
+
+ then("a failure response should have been returned by the producer")
+ val expectedFailureText = result.cause.getMessage
+ val expectedHeaders = result.headers
+ assert(expectedFailureText === "failure")
+ assert(expectedHeaders === Map(Message.MessageExchangeId -> "123"))
+ }
+
+ }
+
+ feature("Produce a message to a sync Camel route and then forward the response") {
+
+ scenario("produce message and send normal response to direct:forward-test-1") {
+ given("a registered one-way producer configured with a forward target")
+ val producer = actorOf(classOf[SampleUntypedForwardingProducer])
+ producer.start
+
+ when("a test message is sent to the producer with !")
+ mockEndpoint.expectedBodiesReceived("received test")
+ val result = producer.sendOneWay(Message("test"), producer)
+
+ then("a normal response should have been sent")
+ mockEndpoint.assertIsSatisfied
+ }
+
+ }
+
+ private def mockEndpoint = CamelContextManager.context.getEndpoint("mock:mock", classOf[MockEndpoint])
+}
+
+object UntypedProducerFeatureTest {
+ class TestRoute extends RouteBuilder {
+ def configure {
+ from("direct:forward-test-1").to("mock:mock")
+ from("direct:producer-test-1").process(new Processor() {
+ def process(exchange: Exchange) = {
+ exchange.getIn.getBody match {
+ case "fail" => throw new Exception("failure")
+ case body => exchange.getOut.setBody("received %s" format body)
+ }
+ }
+ })
+ }
+ }
+}