Java API for customizing routes to consumer actors

This commit is contained in:
Martin Krasser 2010-11-08 14:13:50 +01:00
parent 3962c56ff7
commit 1913b32738
5 changed files with 92 additions and 3 deletions

View file

@ -10,6 +10,7 @@ import org.apache.camel.{Exchange, Processor}
import org.apache.camel.model.{RouteDefinition, ProcessorDefinition}
import akka.actor._
import akka.japi.{Function => JFunction}
/**
* Mixed in by Actor implementations that consume message from Camel endpoints.
@ -66,6 +67,12 @@ trait UntypedConsumer extends Consumer { self: UntypedActor =>
* doesn't have any effect on one-way communications (they'll never block).
*/
def isBlocking() = super.blocking
/**
* Sets the route definition handler for creating a custom route to this consumer instance.
*/
def onRouteDefinition(h: JFunction[RouteDefinition, ProcessorDefinition[_]]): Unit =
onRouteDefinition { rd: RouteDefinition => h(rd) }
}
/**

View file

@ -0,0 +1,42 @@
package akka.camel;
import akka.actor.ActorRegistry;
import akka.actor.UntypedActor;
import akka.japi.SideEffect;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static akka.camel.CamelContextManager.*;
import static akka.camel.CamelServiceManager.*;
import static org.junit.Assert.*;
/**
* @author Martin Krasser
*/
public class ConsumerJavaTestBase {
@BeforeClass
public static void setUpBeforeClass() {
startCamelService();
}
@AfterClass
public static void tearDownAfterClass() {
stopCamelService();
ActorRegistry.shutdownAll();
}
@Test
public void shouldHandleExceptionAndGenerateCustomResponse() {
getMandatoryService().awaitEndpointActivation(1, new SideEffect() {
public void apply() {
UntypedActor.actorOf(SampleErrorHandlingConsumer.class).start();
}
});
String result = getMandatoryTemplate().requestBody("direct:error-handler-test-java", "hello", String.class);
assertEquals("error: hello", result);
}
}

View file

@ -0,0 +1,35 @@
package akka.camel;
import akka.japi.Function;
import org.apache.camel.builder.Builder;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.RouteDefinition;
/**
* @author Martin Krasser
*/
public class SampleErrorHandlingConsumer extends UntypedConsumerActor {
public String getEndpointUri() {
return "direct:error-handler-test-java";
}
public boolean isBlocking() {
return true;
}
public void preStart() {
onRouteDefinition(new Function<RouteDefinition, ProcessorDefinition<?>>() {
public ProcessorDefinition<?> apply(RouteDefinition rd) {
return rd.onException(Exception.class).handled(true).transform(Builder.exceptionMessage()).end();
}
});
}
public void onReceive(Object message) throws Exception {
Message msg = (Message)message;
String body = msg.getBodyAs(String.class);
throw new Exception(String.format("error: %s", body));
}
}

View file

@ -0,0 +1,5 @@
package akka.camel
import org.scalatest.junit.JUnitSuite
class ConsumerJavaTest extends ConsumerJavaTestBase with JUnitSuite

View file

@ -14,9 +14,9 @@ import akka.actor._
/**
* @author Martin Krasser
*/
class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
class ConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
import CamelContextManager.mandatoryTemplate
import ConsumerTest._
import ConsumerScalaTest._
var service: CamelService = _
@ -195,7 +195,7 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
}
}
object ConsumerTest {
object ConsumerScalaTest {
trait BlockingConsumer extends Consumer { self: Actor =>
override def blocking = true
}