From 58c9785db169ca4320547d316a0c5d275d34e017 Mon Sep 17 00:00:00 2001 From: RayRoestenburg Date: Sun, 22 Jul 2012 13:27:24 +0200 Subject: [PATCH] processed code review, still have to do docs comments --- .../src/main/scala/akka/camel/Camel.scala | 30 ++++++ .../main/scala/akka/camel/CamelSupport.scala | 15 ++- .../src/main/scala/akka/camel/Consumer.scala | 27 ++++-- .../akka/camel/internal/DefaultCamel.scala | 4 +- .../internal/component/ActorComponent.scala | 19 ++-- ...sumer.scala => UntypedConsumerActor.scala} | 17 +++- .../camel/javaapi/UntypedProducerActor.scala | 7 ++ .../java/akka/camel/CustomRouteTestBase.java | 95 ++++++++++++------- .../akka/camel/ConsumerIntegrationTest.scala | 12 +++ .../scala/akka/camel/DefaultCamelTest.scala | 6 +- .../test/scala/akka/camel/TestSupport.scala | 1 + .../component/ActorProducerTest.scala | 11 ++- 12 files changed, 174 insertions(+), 70 deletions(-) rename akka-camel/src/main/scala/akka/camel/javaapi/{UntypedConsumer.scala => UntypedConsumerActor.scala} (50%) diff --git a/akka-camel/src/main/scala/akka/camel/Camel.scala b/akka-camel/src/main/scala/akka/camel/Camel.scala index 72252212cf..fb80b530e3 100644 --- a/akka-camel/src/main/scala/akka/camel/Camel.scala +++ b/akka-camel/src/main/scala/akka/camel/Camel.scala @@ -7,6 +7,9 @@ package akka.camel import internal._ import akka.actor._ import org.apache.camel.{ ProducerTemplate, CamelContext } +import com.typesafe.config.Config +import akka.util.Duration +import java.util.concurrent.TimeUnit._ /** * Camel trait encapsulates the underlying camel machinery. @@ -30,6 +33,33 @@ trait Camel extends ConsumerRegistry with ProducerRegistry with Extension with A */ def template: ProducerTemplate + /** + * The settings for the CamelExtension + */ + def settings: CamelSettings +} + +/** + * Settings for the Camel Extension + * @param config the config + */ +class CamelSettings(val config: Config) { + /** + * Configured setting for how long the actor should wait for activation before it fails. + */ + final val activationTimeout: Duration = Duration(config.getMilliseconds("akka.camel.consumer.activationTimeout"), MILLISECONDS) + /** + * Configured setting, When endpoint is out-capable (can produce responses) replyTimeout is the maximum time + * the endpoint can take to send the response before the message exchange fails. + * This setting is used for out-capable, in-only, manually acknowledged communication. + */ + final val replyTimeout: Duration = Duration(config.getMilliseconds("akka.camel.consumer.replyTimeout"), MILLISECONDS) + /** + * Configured setting which determines whether one-way communications between an endpoint and this consumer actor + * should be auto-acknowledged or application-acknowledged. + * This flag has only effect when exchange is in-only. + */ + final val autoAck: Boolean = config.getBoolean("akka.camel.consumer.autoAck") } /** diff --git a/akka-camel/src/main/scala/akka/camel/CamelSupport.scala b/akka-camel/src/main/scala/akka/camel/CamelSupport.scala index 79e18dcaef..fe92673f84 100644 --- a/akka-camel/src/main/scala/akka/camel/CamelSupport.scala +++ b/akka-camel/src/main/scala/akka/camel/CamelSupport.scala @@ -1,14 +1,21 @@ package akka.camel import akka.actor.Actor +import com.typesafe.config.Config +import akka.util.Duration +import java.util.concurrent.TimeUnit._ private[camel] trait CamelSupport { this: Actor ⇒ + /** - * camel extension + * For internal use only. Returns a [[akka.camel.Camel]] trait which provides access to the CamelExtension. */ - protected[this] implicit def camel = CamelExtension(context.system) + protected def camel = CamelExtension(context.system) + /** - * camelContext implicit is useful when using advanced methods of CamelMessage. + * Returns the CamelContext. + * The camelContext is defined implicit for simplifying the use of CamelMessage from the Scala API. */ - protected[this] implicit def camelContext = camel.context + protected implicit def camelContext = camel.context + } diff --git a/akka-camel/src/main/scala/akka/camel/Consumer.scala b/akka-camel/src/main/scala/akka/camel/Consumer.scala index 60b54d6509..a354ae190c 100644 --- a/akka-camel/src/main/scala/akka/camel/Consumer.scala +++ b/akka-camel/src/main/scala/akka/camel/Consumer.scala @@ -4,7 +4,6 @@ package akka.camel -import internal.component.DurationTypeConverter import org.apache.camel.model.{ RouteDefinition, ProcessorDefinition } import akka.actor._ import akka.util.Duration @@ -17,10 +16,23 @@ import java.util.concurrent.TimeUnit.MILLISECONDS * @author Martin Krasser */ trait Consumer extends Actor with CamelSupport with ConsumerConfig { - + /** + * Must return the Camel endpoint URI that the consumer wants to consume messages from. + */ def endpointUri: String - camel.registerConsumer(endpointUri, this, activationTimeout) + /** + * Registers the consumer endpoint. Note: when overriding this method, be sure to + * call 'super.preRestart', otherwise the consumer endpoint will not be registered. + */ + override def preStart() { + super.preStart() + // Possible FIXME. registering the endpoint here because of problems + // with order of execution of trait body in the Java version (UntypedConsumerActor) + // where getEndpointUri is called before its constructor (where a uri is set to return from getEndpointUri) + // and remains null. CustomRouteTest provides a test to verify this. + camel.registerConsumer(endpointUri, this, activationTimeout) + } } case object DefaultConsumerParameters { @@ -28,26 +40,25 @@ case object DefaultConsumerParameters { val autoAck = true } -trait ConsumerConfig { this: Actor ⇒ - private val config = this.context.system.settings.config +trait ConsumerConfig { this: CamelSupport ⇒ /** * How long the actor should wait for activation before it fails. */ - def activationTimeout: Duration = Duration(config.getMilliseconds("akka.camel.consumer.activationTimeout"), MILLISECONDS) + def activationTimeout: Duration = camel.settings.activationTimeout /** * When endpoint is out-capable (can produce responses) replyTimeout is the maximum time * the endpoint can take to send the response before the message exchange fails. It defaults to 1 minute. * This setting is used for out-capable, in-only, manually acknowledged communication. */ - def replyTimeout: Duration = Duration(config.getMilliseconds("akka.camel.consumer.replyTimeout"), MILLISECONDS) + def replyTimeout: Duration = camel.settings.replyTimeout /** * Determines whether one-way communications between an endpoint and this consumer actor * should be auto-acknowledged or application-acknowledged. * This flag has only effect when exchange is in-only. */ - def autoAck: Boolean = config.getBoolean("akka.camel.consumer.autoAck") + def autoAck: Boolean = camel.settings.autoAck /** * The route definition handler for creating a custom route to this consumer instance. diff --git a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala index 5a239c766c..a596a869ce 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala @@ -5,7 +5,7 @@ import component.{ DurationTypeConverter, ActorComponent } import org.apache.camel.impl.DefaultCamelContext import scala.Predef._ import akka.event.Logging -import akka.camel.Camel +import akka.camel.{ CamelSettings, Camel } import akka.util.{ NonFatal, Duration } import org.apache.camel.{ ProducerTemplate, CamelContext } @@ -33,6 +33,8 @@ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel { ctx } + lazy val settings = new CamelSettings(system.settings.config) + lazy val template: ProducerTemplate = context.createProducerTemplate() /** diff --git a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala index b0e72efe34..a47c913b69 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala @@ -35,7 +35,7 @@ private[camel] class ActorComponent(camel: Camel, system: ActorSystem) extends D * @see org.apache.camel.Component */ def createEndpoint(uri: String, remaining: String, parameters: JMap[String, Object]): ActorEndpoint = - new ActorEndpoint(uri, this, ActorEndpointPath.fromCamelPath(uri), camel, system) + new ActorEndpoint(uri, this, ActorEndpointPath.fromCamelPath(uri), camel) } /** @@ -54,8 +54,7 @@ private[camel] class ActorComponent(camel: Camel, system: ActorSystem) extends D private[camel] class ActorEndpoint(uri: String, comp: ActorComponent, val path: ActorEndpointPath, - camel: Camel, - val system: ActorSystem) extends DefaultEndpoint(uri, comp) with ActorEndpointConfig { + val camel: Camel) extends DefaultEndpoint(uri, comp) with ActorEndpointConfig { /** * The ActorEndpoint only supports receiving messages from Camel. @@ -89,17 +88,11 @@ private[camel] class ActorEndpoint(uri: String, */ private[camel] trait ActorEndpointConfig { def path: ActorEndpointPath - def system: ActorSystem - private def config = system.settings.config - @BeanProperty var replyTimeout: Duration = Duration(config.getMilliseconds("akka.camel.consumer.replyTimeout"), MILLISECONDS) + def camel: Camel - /** - * Whether to auto-acknowledge one-way message exchanges with (untyped) actors. This is - * set via the autoAck=true|false endpoint URI parameter. Default value is - * true. When set to false consumer actors need to additionally - * call Consumer.ack within Actor.receive. - */ - @BeanProperty var autoAck: Boolean = config.getBoolean("akka.camel.consumer.autoAck") + @BeanProperty var replyTimeout: Duration = camel.settings.replyTimeout + + @BeanProperty var autoAck: Boolean = camel.settings.autoAck } /** diff --git a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumer.scala b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumerActor.scala similarity index 50% rename from akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumer.scala rename to akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumerActor.scala index a4671583bb..3cef3d285a 100644 --- a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumer.scala +++ b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumerActor.scala @@ -16,19 +16,26 @@ abstract class UntypedConsumerActor extends UntypedActor with Consumer { final def endpointUri: String = getEndpointUri /** - * Returns the Camel endpoint URI to consume messages from. + * ''Java API'': Returns the Camel endpoint URI to consume messages from. */ def getEndpointUri(): String /** - * Returns the [[org.apache.camel.CamelContext]] + * ''Java API'': Returns the [[org.apache.camel.CamelContext]] * @return the CamelContext */ - protected def getCamelContext: CamelContext = camelContext + protected def getCamelContext(): CamelContext = camelContext /** - * Returns the [[org.apache.camel.ProducerTemplate]] + * ''Java API'': Returns the [[org.apache.camel.ProducerTemplate]] * @return the ProducerTemplate */ - protected def getProducerTemplate: ProducerTemplate = camel.template + protected def getProducerTemplate(): ProducerTemplate = camel.template + + /** + * ''Java API'': Returns the [[akka.camel.Activation]] interface + * that can be used to wait on activation or de-activation of Camel endpoints. + * @return the Activation interface + */ + protected def getActivation(): Activation = camel } diff --git a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala index f44daf0725..7caf77b35b 100644 --- a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala +++ b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala @@ -70,4 +70,11 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport { * Returns the ProducerTemplate. */ def getProducerTemplate(): ProducerTemplate = camel.template + + /** + * ''Java API'': Returns the [[akka.camel.Activation]] interface + * that can be used to wait on activation or de-activation of Camel endpoints. + * @return the Activation interface + */ + def getActivation(): Activation = camel } diff --git a/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java index f97a62dae1..8e7361506d 100644 --- a/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java +++ b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java @@ -10,25 +10,24 @@ import org.apache.camel.Exchange; import org.apache.camel.Predicate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.Before; +import org.junit.After; import org.junit.Test; import java.util.concurrent.TimeUnit; public class CustomRouteTestBase { - private static Camel camel; private static ActorSystem system; - @BeforeClass - public static void setUpBeforeClass() { + @Before + public void before() { system = ActorSystem.create("test"); camel = (Camel) CamelExtension.get(system); } - @AfterClass - public static void cleanup() { + @After + public void after() { system.shutdown(); } @@ -42,13 +41,27 @@ public class CustomRouteTestBase { system.stop(producer); } + @Test + public void testCustomProducerUriRoute() throws Exception { + MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockProducerUri", MockEndpoint.class); + ActorRef producer = system.actorOf(new Props(new UntypedActorFactory(){ + public Actor create() { + return new EndpointProducer("mock:mockProducerUri"); + } + }), "mockEndpointUri"); + camel.context().addRoutes(new CustomRouteBuilder("direct:test",producer)); + camel.template().sendBody("direct:test", "test"); + assertMockEndpoint(mockEndpoint); + system.stop(producer); + } + @Test public void testCustomConsumerRoute() throws Exception { MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockConsumer", MockEndpoint.class); ActorRef consumer = system.actorOf(new Props(TestConsumer.class), "testConsumer"); camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS)); - camel.context().addRoutes(new CustomRouteBuilder("direct:testConsumer",consumer)); - camel.template().sendBody("direct:testConsumer", "test"); + camel.context().addRoutes(new CustomRouteBuilder("direct:testRouteConsumer",consumer)); + camel.template().sendBody("direct:testRouteConsumer", "test"); assertMockEndpoint(mockEndpoint); system.stop(consumer); } @@ -58,7 +71,7 @@ public class CustomRouteTestBase { MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAck", MockEndpoint.class); ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { - return new TestAckConsumer("mock:mockAck"); + return new TestAckConsumer("direct:testConsumerAck","mock:mockAck"); } }), "testConsumerAck"); camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS)); @@ -73,7 +86,7 @@ public class CustomRouteTestBase { MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAckUri", MockEndpoint.class); ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { - return new TestAckConsumer("mock:mockAckUri"); + return new TestAckConsumer("direct:testConsumerAckFromUri","mock:mockAckUri"); } }), "testConsumerAckUri"); camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS)); @@ -87,7 +100,7 @@ public class CustomRouteTestBase { public void testCustomTimeoutConsumerRoute() throws Exception { ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { - return new TestAckConsumer("mock:mockAckUri"); + return new TestAckConsumer("direct:testConsumerException","mock:mockException"); } }), "testConsumerException"); camel.awaitActivation(consumer, new FiniteDuration(10, TimeUnit.SECONDS)); @@ -130,26 +143,6 @@ public class CustomRouteTestBase { } } - public static class TestAckConsumer extends UntypedConsumerActor { - - String endpoint; - - public TestAckConsumer(String to){ - endpoint = to; - } - - @Override - public String getEndpointUri() { - return "direct:testconsumer"; - } - - @Override - public void onReceive(Object message) { - this.getProducerTemplate().sendBody(endpoint, "test"); - getSender().tell(Ack.getInstance()); - } - } - public static class TestConsumer extends UntypedConsumerActor { @Override @@ -163,6 +156,23 @@ public class CustomRouteTestBase { } } + public static class EndpointProducer extends UntypedProducerActor { + private String uri; + + public EndpointProducer(String uri) { + this.uri = uri; + } + + public String getEndpointUri() { + return uri; + } + + @Override + public boolean isOneway() { + return true; + } + } + public static class MockEndpointProducer extends UntypedProducerActor { public String getEndpointUri() { return "mock:mockProducer"; @@ -173,4 +183,25 @@ public class CustomRouteTestBase { return true; } } + + public static class TestAckConsumer extends UntypedConsumerActor { + private String myuri; + private String to; + + public TestAckConsumer(String uri, String to){ + myuri = uri; + this.to = to; + } + + @Override + public String getEndpointUri() { + return myuri; + } + + @Override + public void onReceive(Object message) { + this.getProducerTemplate().sendBody(to, "test"); + getSender().tell(Ack.getInstance()); + } + } } diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala index f03150b8ae..383405db61 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala @@ -87,6 +87,18 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC camel.routeCount must be(0) } + "Consumer must register on uri passed in through constructor" in { + val consumer = start(new TestActor("direct://test")) + camel.awaitActivation(consumer, defaultTimeout seconds) + + camel.routeCount must be > (0) + camel.routes.get(0).getEndpoint.getEndpointUri must be("direct://test") + system.stop(consumer) + camel.awaitDeactivation(consumer, defaultTimeout seconds) + + camel.routeCount must be(0) + } + "Error passing consumer supports error handling through route modification" in { start(new ErrorThrowingConsumer("direct:error-handler-test") with ErrorPassing { override def onRouteDefinition(rd: RouteDefinition) = { diff --git a/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala b/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala index abc50b49e8..b671060c6f 100644 --- a/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala +++ b/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala @@ -17,14 +17,14 @@ class DefaultCamelTest extends WordSpec with SharedCamelSystem with MustMatchers import org.mockito.Mockito.{ when, verify } - def camelWitMocks = new DefaultCamel(mock[ActorSystem]) { + def camelWithMocks = new DefaultCamel(mock[ActorSystem]) { override val log = mock[LoggingAdapter] override lazy val template = mock[ProducerTemplate] override lazy val context = mock[CamelContext] } "during shutdown, when both context and template fail to shutdown" when { - val camel = camelWitMocks + val camel = camelWithMocks when(camel.context.stop()) thenThrow new RuntimeException("context") when(camel.template.stop()) thenThrow new RuntimeException("template") @@ -44,7 +44,7 @@ class DefaultCamelTest extends WordSpec with SharedCamelSystem with MustMatchers } "during start, if template fails to start, it will stop the context" in { - val camel = camelWitMocks + val camel = camelWithMocks when(camel.template.start()) thenThrow new RuntimeException diff --git a/akka-camel/src/test/scala/akka/camel/TestSupport.scala b/akka-camel/src/test/scala/akka/camel/TestSupport.scala index fee20267b1..cbc6f43fd7 100644 --- a/akka-camel/src/test/scala/akka/camel/TestSupport.scala +++ b/akka-camel/src/test/scala/akka/camel/TestSupport.scala @@ -37,6 +37,7 @@ private[camel] object TestSupport { } def routeCount = camel.context.getRoutes().size() + def routes = camel.context.getRoutes } @deprecated diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala index 891b630a98..bf31755d53 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala @@ -5,7 +5,7 @@ package akka.camel.internal.component import org.scalatest.mock.MockitoSugar -import org.mockito.Matchers.{ eq ⇒ the, any } +import org.mockito.Matchers.any import org.mockito.Mockito._ import org.apache.camel.AsyncCallback import java.util.concurrent.atomic.AtomicBoolean @@ -15,7 +15,7 @@ import akka.testkit.{ TestKit, TestProbe } import java.lang.String import akka.actor.{ ActorRef, Props, ActorSystem, Actor } import akka.camel._ -import internal.CamelExchangeAdapter +import internal.{ DefaultCamel, CamelExchangeAdapter } import org.scalatest.{ Suite, WordSpec, BeforeAndAfterAll, BeforeAndAfterEach } import akka.camel.TestSupport._ import java.util.concurrent.{ TimeoutException, CountDownLatch, TimeUnit } @@ -269,7 +269,10 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo asyncCallback = createAsyncCallback probe = TestProbe() - camel = mock[Camel] + camel = camelWithMocks + def camelWithMocks = new DefaultCamel(mock[ActorSystem]) { + override lazy val settings = mock[CamelSettings] + } exchange = mock[CamelExchangeAdapter] callback = mock[AsyncCallback] actorEndpointPath = mock[ActorEndpointPath] @@ -321,7 +324,7 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo } def config(endpointUri: String = "test-uri", isAutoAck: Boolean = true, _replyTimeout: Duration = Int.MaxValue seconds) = { - val endpoint = new ActorEndpoint(endpointUri, actorComponent, actorEndpointPath, camel, system) + val endpoint = new ActorEndpoint(endpointUri, actorComponent, actorEndpointPath, camel) endpoint.autoAck = isAutoAck endpoint.replyTimeout = _replyTimeout endpoint