2012-07-18 08:06:07 +02:00
|
|
|
package akka.camel;
|
|
|
|
|
|
|
|
|
|
import akka.actor.*;
|
|
|
|
|
import akka.camel.internal.component.CamelPath;
|
|
|
|
|
import akka.camel.javaapi.UntypedConsumerActor;
|
|
|
|
|
import akka.camel.javaapi.UntypedProducerActor;
|
2012-07-23 15:49:19 +02:00
|
|
|
import scala.concurrent.util.FiniteDuration;
|
2012-07-18 08:06:07 +02:00
|
|
|
import org.apache.camel.CamelExecutionException;
|
|
|
|
|
import org.apache.camel.Exchange;
|
|
|
|
|
import org.apache.camel.Predicate;
|
|
|
|
|
import org.apache.camel.builder.RouteBuilder;
|
|
|
|
|
import org.apache.camel.component.mock.MockEndpoint;
|
2012-07-22 13:27:24 +02:00
|
|
|
import org.junit.Before;
|
|
|
|
|
import org.junit.After;
|
2012-07-18 08:06:07 +02:00
|
|
|
import org.junit.Test;
|
|
|
|
|
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
|
|
public class CustomRouteTestBase {
|
|
|
|
|
private static Camel camel;
|
|
|
|
|
private static ActorSystem system;
|
|
|
|
|
|
2012-07-22 13:27:24 +02:00
|
|
|
@Before
|
|
|
|
|
public void before() {
|
2012-07-18 08:06:07 +02:00
|
|
|
system = ActorSystem.create("test");
|
|
|
|
|
camel = (Camel) CamelExtension.get(system);
|
|
|
|
|
}
|
|
|
|
|
|
2012-07-22 13:27:24 +02:00
|
|
|
@After
|
|
|
|
|
public void after() {
|
2012-07-18 08:06:07 +02:00
|
|
|
system.shutdown();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testCustomProducerRoute() throws Exception {
|
|
|
|
|
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockProducer", MockEndpoint.class);
|
|
|
|
|
ActorRef producer = system.actorOf(new Props(MockEndpointProducer.class), "mockEndpoint");
|
|
|
|
|
camel.context().addRoutes(new CustomRouteBuilder("direct:test",producer));
|
|
|
|
|
camel.template().sendBody("direct:test", "test");
|
|
|
|
|
assertMockEndpoint(mockEndpoint);
|
|
|
|
|
system.stop(producer);
|
|
|
|
|
}
|
|
|
|
|
|
2012-07-22 13:27:24 +02:00
|
|
|
@Test
|
|
|
|
|
public void testCustomProducerUriRoute() throws Exception {
|
|
|
|
|
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockProducerUri", MockEndpoint.class);
|
|
|
|
|
ActorRef producer = system.actorOf(new Props(new UntypedActorFactory(){
|
|
|
|
|
public Actor create() {
|
|
|
|
|
return new EndpointProducer("mock:mockProducerUri");
|
|
|
|
|
}
|
|
|
|
|
}), "mockEndpointUri");
|
|
|
|
|
camel.context().addRoutes(new CustomRouteBuilder("direct:test",producer));
|
|
|
|
|
camel.template().sendBody("direct:test", "test");
|
|
|
|
|
assertMockEndpoint(mockEndpoint);
|
|
|
|
|
system.stop(producer);
|
|
|
|
|
}
|
|
|
|
|
|
2012-07-18 08:06:07 +02:00
|
|
|
@Test
|
|
|
|
|
public void testCustomConsumerRoute() throws Exception {
|
|
|
|
|
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockConsumer", MockEndpoint.class);
|
|
|
|
|
ActorRef consumer = system.actorOf(new Props(TestConsumer.class), "testConsumer");
|
|
|
|
|
camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS));
|
2012-07-22 13:27:24 +02:00
|
|
|
camel.context().addRoutes(new CustomRouteBuilder("direct:testRouteConsumer",consumer));
|
|
|
|
|
camel.template().sendBody("direct:testRouteConsumer", "test");
|
2012-07-18 08:06:07 +02:00
|
|
|
assertMockEndpoint(mockEndpoint);
|
|
|
|
|
system.stop(consumer);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testCustomAckConsumerRoute() throws Exception {
|
|
|
|
|
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAck", MockEndpoint.class);
|
|
|
|
|
ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){
|
|
|
|
|
public Actor create() {
|
2012-07-22 13:27:24 +02:00
|
|
|
return new TestAckConsumer("direct:testConsumerAck","mock:mockAck");
|
2012-07-18 08:06:07 +02:00
|
|
|
}
|
|
|
|
|
}), "testConsumerAck");
|
|
|
|
|
camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS));
|
|
|
|
|
camel.context().addRoutes(new CustomRouteBuilder("direct:testAck", consumer, false, new FiniteDuration(10, TimeUnit.SECONDS)));
|
|
|
|
|
camel.template().sendBody("direct:testAck", "test");
|
|
|
|
|
assertMockEndpoint(mockEndpoint);
|
|
|
|
|
system.stop(consumer);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testCustomAckConsumerRouteFromUri() throws Exception {
|
|
|
|
|
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAckUri", MockEndpoint.class);
|
|
|
|
|
ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){
|
|
|
|
|
public Actor create() {
|
2012-07-22 13:27:24 +02:00
|
|
|
return new TestAckConsumer("direct:testConsumerAckFromUri","mock:mockAckUri");
|
2012-07-18 08:06:07 +02:00
|
|
|
}
|
|
|
|
|
}), "testConsumerAckUri");
|
|
|
|
|
camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS));
|
|
|
|
|
camel.context().addRoutes(new CustomRouteBuilder("direct:testAckFromUri","akka://test/user/testConsumerAckUri?autoAck=false"));
|
|
|
|
|
camel.template().sendBody("direct:testAckFromUri", "test");
|
|
|
|
|
assertMockEndpoint(mockEndpoint);
|
|
|
|
|
system.stop(consumer);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test(expected=CamelExecutionException.class)
|
|
|
|
|
public void testCustomTimeoutConsumerRoute() throws Exception {
|
|
|
|
|
ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){
|
|
|
|
|
public Actor create() {
|
2012-07-22 13:27:24 +02:00
|
|
|
return new TestAckConsumer("direct:testConsumerException","mock:mockException");
|
2012-07-18 08:06:07 +02:00
|
|
|
}
|
|
|
|
|
}), "testConsumerException");
|
|
|
|
|
camel.awaitActivation(consumer, new FiniteDuration(10, TimeUnit.SECONDS));
|
|
|
|
|
camel.context().addRoutes(new CustomRouteBuilder("direct:testException", consumer, false, new FiniteDuration(0, TimeUnit.SECONDS)));
|
|
|
|
|
camel.template().sendBody("direct:testException", "test");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void assertMockEndpoint(MockEndpoint mockEndpoint) throws InterruptedException {
|
|
|
|
|
mockEndpoint.expectedMessageCount(1);
|
|
|
|
|
mockEndpoint.expectedMessagesMatches(new Predicate() {
|
|
|
|
|
public boolean matches(Exchange exchange) {
|
|
|
|
|
return exchange.getIn().getBody().equals("test");
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
mockEndpoint.assertIsSatisfied();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static class CustomRouteBuilder extends RouteBuilder {
|
|
|
|
|
private String uri;
|
|
|
|
|
private String fromUri;
|
|
|
|
|
|
|
|
|
|
public CustomRouteBuilder(String from, String to) {
|
|
|
|
|
fromUri = from;
|
|
|
|
|
uri = to;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public CustomRouteBuilder(String from, ActorRef actor) {
|
|
|
|
|
fromUri = from;
|
|
|
|
|
uri = CamelPath.toUri(actor);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public CustomRouteBuilder(String from, ActorRef actor, boolean autoAck, FiniteDuration replyTimeout) {
|
|
|
|
|
fromUri = from;
|
|
|
|
|
uri = CamelPath.toUri(actor, autoAck, replyTimeout);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void configure() throws Exception {
|
|
|
|
|
from(fromUri).to(uri);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2012-07-22 13:27:24 +02:00
|
|
|
public static class TestConsumer extends UntypedConsumerActor {
|
2012-07-18 08:06:07 +02:00
|
|
|
@Override
|
|
|
|
|
public String getEndpointUri() {
|
|
|
|
|
return "direct:testconsumer";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onReceive(Object message) {
|
2012-07-22 13:27:24 +02:00
|
|
|
this.getProducerTemplate().sendBody("mock:mockConsumer","test");
|
2012-07-18 08:06:07 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-07-22 13:27:24 +02:00
|
|
|
public static class EndpointProducer extends UntypedProducerActor {
|
|
|
|
|
private String uri;
|
|
|
|
|
|
|
|
|
|
public EndpointProducer(String uri) {
|
|
|
|
|
this.uri = uri;
|
|
|
|
|
}
|
2012-07-18 08:06:07 +02:00
|
|
|
|
|
|
|
|
public String getEndpointUri() {
|
2012-07-22 13:27:24 +02:00
|
|
|
return uri;
|
2012-07-18 08:06:07 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
2012-07-22 13:27:24 +02:00
|
|
|
public boolean isOneway() {
|
|
|
|
|
return true;
|
2012-07-18 08:06:07 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static class MockEndpointProducer extends UntypedProducerActor {
|
|
|
|
|
public String getEndpointUri() {
|
|
|
|
|
return "mock:mockProducer";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean isOneway() {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
}
|
2012-07-22 13:27:24 +02:00
|
|
|
|
|
|
|
|
public static class TestAckConsumer extends UntypedConsumerActor {
|
|
|
|
|
private String myuri;
|
|
|
|
|
private String to;
|
|
|
|
|
|
|
|
|
|
public TestAckConsumer(String uri, String to){
|
|
|
|
|
myuri = uri;
|
|
|
|
|
this.to = to;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public String getEndpointUri() {
|
|
|
|
|
return myuri;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onReceive(Object message) {
|
|
|
|
|
this.getProducerTemplate().sendBody(to, "test");
|
|
|
|
|
getSender().tell(Ack.getInstance());
|
|
|
|
|
}
|
|
|
|
|
}
|
2012-07-18 08:06:07 +02:00
|
|
|
}
|