diff --git a/akka-camel/src/main/scala/Consumer.scala b/akka-camel/src/main/scala/Consumer.scala index abf4bcd875..b95856ad95 100644 --- a/akka-camel/src/main/scala/Consumer.scala +++ b/akka-camel/src/main/scala/Consumer.scala @@ -10,6 +10,7 @@ import org.apache.camel.{Exchange, Processor} import org.apache.camel.model.{RouteDefinition, ProcessorDefinition} import akka.actor._ +import akka.japi.{Function => JFunction} /** * Mixed in by Actor implementations that consume message from Camel endpoints. @@ -66,6 +67,12 @@ trait UntypedConsumer extends Consumer { self: UntypedActor => * doesn't have any effect on one-way communications (they'll never block). */ def isBlocking() = super.blocking + + /** + * Sets the route definition handler for creating a custom route to this consumer instance. + */ + def onRouteDefinition(h: JFunction[RouteDefinition, ProcessorDefinition[_]]): Unit = + onRouteDefinition { rd: RouteDefinition => h(rd) } } /** diff --git a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java new file mode 100644 index 0000000000..b3c53327cd --- /dev/null +++ b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java @@ -0,0 +1,42 @@ +package akka.camel; + +import akka.actor.ActorRegistry; +import akka.actor.UntypedActor; +import akka.japi.SideEffect; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static akka.camel.CamelContextManager.*; +import static akka.camel.CamelServiceManager.*; + +import static org.junit.Assert.*; + +/** + * @author Martin Krasser + */ +public class ConsumerJavaTestBase { + + @BeforeClass + public static void setUpBeforeClass() { + startCamelService(); + } + + @AfterClass + public static void tearDownAfterClass() { + stopCamelService(); + ActorRegistry.shutdownAll(); + } + + @Test + public void shouldHandleExceptionAndGenerateCustomResponse() { + getMandatoryService().awaitEndpointActivation(1, new SideEffect() { + public void apply() { + UntypedActor.actorOf(SampleErrorHandlingConsumer.class).start(); + } + }); + String result = getMandatoryTemplate().requestBody("direct:error-handler-test-java", "hello", String.class); + assertEquals("error: hello", result); + } +} diff --git a/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java b/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java new file mode 100644 index 0000000000..4fec485cc5 --- /dev/null +++ b/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java @@ -0,0 +1,35 @@ +package akka.camel; + +import akka.japi.Function; +import org.apache.camel.builder.Builder; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.model.RouteDefinition; + +/** + * @author Martin Krasser + */ +public class SampleErrorHandlingConsumer extends UntypedConsumerActor { + + public String getEndpointUri() { + return "direct:error-handler-test-java"; + } + + public boolean isBlocking() { + return true; + } + + public void preStart() { + onRouteDefinition(new Function>() { + public ProcessorDefinition apply(RouteDefinition rd) { + return rd.onException(Exception.class).handled(true).transform(Builder.exceptionMessage()).end(); + } + }); + } + + public void onReceive(Object message) throws Exception { + Message msg = (Message)message; + String body = msg.getBodyAs(String.class); + throw new Exception(String.format("error: %s", body)); + } + +} diff --git a/akka-camel/src/test/scala/ConsumerJavaTest.scala b/akka-camel/src/test/scala/ConsumerJavaTest.scala new file mode 100644 index 0000000000..48741dda96 --- /dev/null +++ b/akka-camel/src/test/scala/ConsumerJavaTest.scala @@ -0,0 +1,5 @@ +package akka.camel + +import org.scalatest.junit.JUnitSuite + +class ConsumerJavaTest extends ConsumerJavaTestBase with JUnitSuite \ No newline at end of file diff --git a/akka-camel/src/test/scala/ConsumerTest.scala b/akka-camel/src/test/scala/ConsumerScalaTest.scala similarity index 98% rename from akka-camel/src/test/scala/ConsumerTest.scala rename to akka-camel/src/test/scala/ConsumerScalaTest.scala index a5a6e9b903..ddbe757a3f 100644 --- a/akka-camel/src/test/scala/ConsumerTest.scala +++ b/akka-camel/src/test/scala/ConsumerScalaTest.scala @@ -14,9 +14,9 @@ import akka.actor._ /** * @author Martin Krasser */ -class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers { +class ConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMatchers { import CamelContextManager.mandatoryTemplate - import ConsumerTest._ + import ConsumerScalaTest._ var service: CamelService = _ @@ -195,7 +195,7 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers { } } -object ConsumerTest { +object ConsumerScalaTest { trait BlockingConsumer extends Consumer { self: Actor => override def blocking = true }