diff --git a/akka-camel/src/main/scala/akka/camel/Camel.scala b/akka-camel/src/main/scala/akka/camel/Camel.scala
index 72252212cf..fb80b530e3 100644
--- a/akka-camel/src/main/scala/akka/camel/Camel.scala
+++ b/akka-camel/src/main/scala/akka/camel/Camel.scala
@@ -7,6 +7,9 @@ package akka.camel
import internal._
import akka.actor._
import org.apache.camel.{ ProducerTemplate, CamelContext }
+import com.typesafe.config.Config
+import akka.util.Duration
+import java.util.concurrent.TimeUnit._
/**
* Camel trait encapsulates the underlying camel machinery.
@@ -30,6 +33,33 @@ trait Camel extends ConsumerRegistry with ProducerRegistry with Extension with A
*/
def template: ProducerTemplate
+ /**
+ * The settings for the CamelExtension
+ */
+ def settings: CamelSettings
+}
+
+/**
+ * Settings for the Camel Extension
+ * @param config the config
+ */
+class CamelSettings(val config: Config) {
+ /**
+ * Configured setting for how long the actor should wait for activation before it fails.
+ */
+ final val activationTimeout: Duration = Duration(config.getMilliseconds("akka.camel.consumer.activationTimeout"), MILLISECONDS)
+ /**
+ * Configured setting, When endpoint is out-capable (can produce responses) replyTimeout is the maximum time
+ * the endpoint can take to send the response before the message exchange fails.
+ * This setting is used for out-capable, in-only, manually acknowledged communication.
+ */
+ final val replyTimeout: Duration = Duration(config.getMilliseconds("akka.camel.consumer.replyTimeout"), MILLISECONDS)
+ /**
+ * Configured setting which determines whether one-way communications between an endpoint and this consumer actor
+ * should be auto-acknowledged or application-acknowledged.
+ * This flag has only effect when exchange is in-only.
+ */
+ final val autoAck: Boolean = config.getBoolean("akka.camel.consumer.autoAck")
}
/**
diff --git a/akka-camel/src/main/scala/akka/camel/CamelSupport.scala b/akka-camel/src/main/scala/akka/camel/CamelSupport.scala
index 79e18dcaef..fe92673f84 100644
--- a/akka-camel/src/main/scala/akka/camel/CamelSupport.scala
+++ b/akka-camel/src/main/scala/akka/camel/CamelSupport.scala
@@ -1,14 +1,21 @@
package akka.camel
import akka.actor.Actor
+import com.typesafe.config.Config
+import akka.util.Duration
+import java.util.concurrent.TimeUnit._
private[camel] trait CamelSupport { this: Actor ⇒
+
/**
- * camel extension
+ * For internal use only. Returns a [[akka.camel.Camel]] trait which provides access to the CamelExtension.
*/
- protected[this] implicit def camel = CamelExtension(context.system)
+ protected def camel = CamelExtension(context.system)
+
/**
- * camelContext implicit is useful when using advanced methods of CamelMessage.
+ * Returns the CamelContext.
+ * The camelContext is defined implicit for simplifying the use of CamelMessage from the Scala API.
*/
- protected[this] implicit def camelContext = camel.context
+ protected implicit def camelContext = camel.context
+
}
diff --git a/akka-camel/src/main/scala/akka/camel/Consumer.scala b/akka-camel/src/main/scala/akka/camel/Consumer.scala
index 60b54d6509..a354ae190c 100644
--- a/akka-camel/src/main/scala/akka/camel/Consumer.scala
+++ b/akka-camel/src/main/scala/akka/camel/Consumer.scala
@@ -4,7 +4,6 @@
package akka.camel
-import internal.component.DurationTypeConverter
import org.apache.camel.model.{ RouteDefinition, ProcessorDefinition }
import akka.actor._
import akka.util.Duration
@@ -17,10 +16,23 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
* @author Martin Krasser
*/
trait Consumer extends Actor with CamelSupport with ConsumerConfig {
-
+ /**
+ * Must return the Camel endpoint URI that the consumer wants to consume messages from.
+ */
def endpointUri: String
- camel.registerConsumer(endpointUri, this, activationTimeout)
+ /**
+ * Registers the consumer endpoint. Note: when overriding this method, be sure to
+ * call 'super.preRestart', otherwise the consumer endpoint will not be registered.
+ */
+ override def preStart() {
+ super.preStart()
+ // Possible FIXME. registering the endpoint here because of problems
+ // with order of execution of trait body in the Java version (UntypedConsumerActor)
+ // where getEndpointUri is called before its constructor (where a uri is set to return from getEndpointUri)
+ // and remains null. CustomRouteTest provides a test to verify this.
+ camel.registerConsumer(endpointUri, this, activationTimeout)
+ }
}
case object DefaultConsumerParameters {
@@ -28,26 +40,25 @@ case object DefaultConsumerParameters {
val autoAck = true
}
-trait ConsumerConfig { this: Actor ⇒
- private val config = this.context.system.settings.config
+trait ConsumerConfig { this: CamelSupport ⇒
/**
* How long the actor should wait for activation before it fails.
*/
- def activationTimeout: Duration = Duration(config.getMilliseconds("akka.camel.consumer.activationTimeout"), MILLISECONDS)
+ def activationTimeout: Duration = camel.settings.activationTimeout
/**
* When endpoint is out-capable (can produce responses) replyTimeout is the maximum time
* the endpoint can take to send the response before the message exchange fails. It defaults to 1 minute.
* This setting is used for out-capable, in-only, manually acknowledged communication.
*/
- def replyTimeout: Duration = Duration(config.getMilliseconds("akka.camel.consumer.replyTimeout"), MILLISECONDS)
+ def replyTimeout: Duration = camel.settings.replyTimeout
/**
* Determines whether one-way communications between an endpoint and this consumer actor
* should be auto-acknowledged or application-acknowledged.
* This flag has only effect when exchange is in-only.
*/
- def autoAck: Boolean = config.getBoolean("akka.camel.consumer.autoAck")
+ def autoAck: Boolean = camel.settings.autoAck
/**
* The route definition handler for creating a custom route to this consumer instance.
diff --git a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala
index 5a239c766c..a596a869ce 100644
--- a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala
+++ b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala
@@ -5,7 +5,7 @@ import component.{ DurationTypeConverter, ActorComponent }
import org.apache.camel.impl.DefaultCamelContext
import scala.Predef._
import akka.event.Logging
-import akka.camel.Camel
+import akka.camel.{ CamelSettings, Camel }
import akka.util.{ NonFatal, Duration }
import org.apache.camel.{ ProducerTemplate, CamelContext }
@@ -33,6 +33,8 @@ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel {
ctx
}
+ lazy val settings = new CamelSettings(system.settings.config)
+
lazy val template: ProducerTemplate = context.createProducerTemplate()
/**
diff --git a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
index b0e72efe34..a47c913b69 100644
--- a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
+++ b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
@@ -35,7 +35,7 @@ private[camel] class ActorComponent(camel: Camel, system: ActorSystem) extends D
* @see org.apache.camel.Component
*/
def createEndpoint(uri: String, remaining: String, parameters: JMap[String, Object]): ActorEndpoint =
- new ActorEndpoint(uri, this, ActorEndpointPath.fromCamelPath(uri), camel, system)
+ new ActorEndpoint(uri, this, ActorEndpointPath.fromCamelPath(uri), camel)
}
/**
@@ -54,8 +54,7 @@ private[camel] class ActorComponent(camel: Camel, system: ActorSystem) extends D
private[camel] class ActorEndpoint(uri: String,
comp: ActorComponent,
val path: ActorEndpointPath,
- camel: Camel,
- val system: ActorSystem) extends DefaultEndpoint(uri, comp) with ActorEndpointConfig {
+ val camel: Camel) extends DefaultEndpoint(uri, comp) with ActorEndpointConfig {
/**
* The ActorEndpoint only supports receiving messages from Camel.
@@ -89,17 +88,11 @@ private[camel] class ActorEndpoint(uri: String,
*/
private[camel] trait ActorEndpointConfig {
def path: ActorEndpointPath
- def system: ActorSystem
- private def config = system.settings.config
- @BeanProperty var replyTimeout: Duration = Duration(config.getMilliseconds("akka.camel.consumer.replyTimeout"), MILLISECONDS)
+ def camel: Camel
- /**
- * Whether to auto-acknowledge one-way message exchanges with (untyped) actors. This is
- * set via the autoAck=true|false endpoint URI parameter. Default value is
- * true. When set to false consumer actors need to additionally
- * call Consumer.ack within Actor.receive.
- */
- @BeanProperty var autoAck: Boolean = config.getBoolean("akka.camel.consumer.autoAck")
+ @BeanProperty var replyTimeout: Duration = camel.settings.replyTimeout
+
+ @BeanProperty var autoAck: Boolean = camel.settings.autoAck
}
/**
diff --git a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumer.scala b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumerActor.scala
similarity index 50%
rename from akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumer.scala
rename to akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumerActor.scala
index a4671583bb..3cef3d285a 100644
--- a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumer.scala
+++ b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumerActor.scala
@@ -16,19 +16,26 @@ abstract class UntypedConsumerActor extends UntypedActor with Consumer {
final def endpointUri: String = getEndpointUri
/**
- * Returns the Camel endpoint URI to consume messages from.
+ * ''Java API'': Returns the Camel endpoint URI to consume messages from.
*/
def getEndpointUri(): String
/**
- * Returns the [[org.apache.camel.CamelContext]]
+ * ''Java API'': Returns the [[org.apache.camel.CamelContext]]
* @return the CamelContext
*/
- protected def getCamelContext: CamelContext = camelContext
+ protected def getCamelContext(): CamelContext = camelContext
/**
- * Returns the [[org.apache.camel.ProducerTemplate]]
+ * ''Java API'': Returns the [[org.apache.camel.ProducerTemplate]]
* @return the ProducerTemplate
*/
- protected def getProducerTemplate: ProducerTemplate = camel.template
+ protected def getProducerTemplate(): ProducerTemplate = camel.template
+
+ /**
+ * ''Java API'': Returns the [[akka.camel.Activation]] interface
+ * that can be used to wait on activation or de-activation of Camel endpoints.
+ * @return the Activation interface
+ */
+ protected def getActivation(): Activation = camel
}
diff --git a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala
index f44daf0725..7caf77b35b 100644
--- a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala
+++ b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala
@@ -70,4 +70,11 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport {
* Returns the ProducerTemplate.
*/
def getProducerTemplate(): ProducerTemplate = camel.template
+
+ /**
+ * ''Java API'': Returns the [[akka.camel.Activation]] interface
+ * that can be used to wait on activation or de-activation of Camel endpoints.
+ * @return the Activation interface
+ */
+ def getActivation(): Activation = camel
}
diff --git a/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java
index f97a62dae1..8e7361506d 100644
--- a/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java
+++ b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java
@@ -10,25 +10,24 @@ import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.Before;
+import org.junit.After;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
public class CustomRouteTestBase {
-
private static Camel camel;
private static ActorSystem system;
- @BeforeClass
- public static void setUpBeforeClass() {
+ @Before
+ public void before() {
system = ActorSystem.create("test");
camel = (Camel) CamelExtension.get(system);
}
- @AfterClass
- public static void cleanup() {
+ @After
+ public void after() {
system.shutdown();
}
@@ -42,13 +41,27 @@ public class CustomRouteTestBase {
system.stop(producer);
}
+ @Test
+ public void testCustomProducerUriRoute() throws Exception {
+ MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockProducerUri", MockEndpoint.class);
+ ActorRef producer = system.actorOf(new Props(new UntypedActorFactory(){
+ public Actor create() {
+ return new EndpointProducer("mock:mockProducerUri");
+ }
+ }), "mockEndpointUri");
+ camel.context().addRoutes(new CustomRouteBuilder("direct:test",producer));
+ camel.template().sendBody("direct:test", "test");
+ assertMockEndpoint(mockEndpoint);
+ system.stop(producer);
+ }
+
@Test
public void testCustomConsumerRoute() throws Exception {
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockConsumer", MockEndpoint.class);
ActorRef consumer = system.actorOf(new Props(TestConsumer.class), "testConsumer");
camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS));
- camel.context().addRoutes(new CustomRouteBuilder("direct:testConsumer",consumer));
- camel.template().sendBody("direct:testConsumer", "test");
+ camel.context().addRoutes(new CustomRouteBuilder("direct:testRouteConsumer",consumer));
+ camel.template().sendBody("direct:testRouteConsumer", "test");
assertMockEndpoint(mockEndpoint);
system.stop(consumer);
}
@@ -58,7 +71,7 @@ public class CustomRouteTestBase {
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAck", MockEndpoint.class);
ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){
public Actor create() {
- return new TestAckConsumer("mock:mockAck");
+ return new TestAckConsumer("direct:testConsumerAck","mock:mockAck");
}
}), "testConsumerAck");
camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS));
@@ -73,7 +86,7 @@ public class CustomRouteTestBase {
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAckUri", MockEndpoint.class);
ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){
public Actor create() {
- return new TestAckConsumer("mock:mockAckUri");
+ return new TestAckConsumer("direct:testConsumerAckFromUri","mock:mockAckUri");
}
}), "testConsumerAckUri");
camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS));
@@ -87,7 +100,7 @@ public class CustomRouteTestBase {
public void testCustomTimeoutConsumerRoute() throws Exception {
ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){
public Actor create() {
- return new TestAckConsumer("mock:mockAckUri");
+ return new TestAckConsumer("direct:testConsumerException","mock:mockException");
}
}), "testConsumerException");
camel.awaitActivation(consumer, new FiniteDuration(10, TimeUnit.SECONDS));
@@ -130,26 +143,6 @@ public class CustomRouteTestBase {
}
}
- public static class TestAckConsumer extends UntypedConsumerActor {
-
- String endpoint;
-
- public TestAckConsumer(String to){
- endpoint = to;
- }
-
- @Override
- public String getEndpointUri() {
- return "direct:testconsumer";
- }
-
- @Override
- public void onReceive(Object message) {
- this.getProducerTemplate().sendBody(endpoint, "test");
- getSender().tell(Ack.getInstance());
- }
- }
-
public static class TestConsumer extends UntypedConsumerActor {
@Override
@@ -163,6 +156,23 @@ public class CustomRouteTestBase {
}
}
+ public static class EndpointProducer extends UntypedProducerActor {
+ private String uri;
+
+ public EndpointProducer(String uri) {
+ this.uri = uri;
+ }
+
+ public String getEndpointUri() {
+ return uri;
+ }
+
+ @Override
+ public boolean isOneway() {
+ return true;
+ }
+ }
+
public static class MockEndpointProducer extends UntypedProducerActor {
public String getEndpointUri() {
return "mock:mockProducer";
@@ -173,4 +183,25 @@ public class CustomRouteTestBase {
return true;
}
}
+
+ public static class TestAckConsumer extends UntypedConsumerActor {
+ private String myuri;
+ private String to;
+
+ public TestAckConsumer(String uri, String to){
+ myuri = uri;
+ this.to = to;
+ }
+
+ @Override
+ public String getEndpointUri() {
+ return myuri;
+ }
+
+ @Override
+ public void onReceive(Object message) {
+ this.getProducerTemplate().sendBody(to, "test");
+ getSender().tell(Ack.getInstance());
+ }
+ }
}
diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala
index f03150b8ae..383405db61 100644
--- a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala
@@ -87,6 +87,18 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
camel.routeCount must be(0)
}
+ "Consumer must register on uri passed in through constructor" in {
+ val consumer = start(new TestActor("direct://test"))
+ camel.awaitActivation(consumer, defaultTimeout seconds)
+
+ camel.routeCount must be > (0)
+ camel.routes.get(0).getEndpoint.getEndpointUri must be("direct://test")
+ system.stop(consumer)
+ camel.awaitDeactivation(consumer, defaultTimeout seconds)
+
+ camel.routeCount must be(0)
+ }
+
"Error passing consumer supports error handling through route modification" in {
start(new ErrorThrowingConsumer("direct:error-handler-test") with ErrorPassing {
override def onRouteDefinition(rd: RouteDefinition) = {
diff --git a/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala b/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala
index abc50b49e8..b671060c6f 100644
--- a/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala
@@ -17,14 +17,14 @@ class DefaultCamelTest extends WordSpec with SharedCamelSystem with MustMatchers
import org.mockito.Mockito.{ when, verify }
- def camelWitMocks = new DefaultCamel(mock[ActorSystem]) {
+ def camelWithMocks = new DefaultCamel(mock[ActorSystem]) {
override val log = mock[LoggingAdapter]
override lazy val template = mock[ProducerTemplate]
override lazy val context = mock[CamelContext]
}
"during shutdown, when both context and template fail to shutdown" when {
- val camel = camelWitMocks
+ val camel = camelWithMocks
when(camel.context.stop()) thenThrow new RuntimeException("context")
when(camel.template.stop()) thenThrow new RuntimeException("template")
@@ -44,7 +44,7 @@ class DefaultCamelTest extends WordSpec with SharedCamelSystem with MustMatchers
}
"during start, if template fails to start, it will stop the context" in {
- val camel = camelWitMocks
+ val camel = camelWithMocks
when(camel.template.start()) thenThrow new RuntimeException
diff --git a/akka-camel/src/test/scala/akka/camel/TestSupport.scala b/akka-camel/src/test/scala/akka/camel/TestSupport.scala
index fee20267b1..cbc6f43fd7 100644
--- a/akka-camel/src/test/scala/akka/camel/TestSupport.scala
+++ b/akka-camel/src/test/scala/akka/camel/TestSupport.scala
@@ -37,6 +37,7 @@ private[camel] object TestSupport {
}
def routeCount = camel.context.getRoutes().size()
+ def routes = camel.context.getRoutes
}
@deprecated
diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala
index 891b630a98..bf31755d53 100644
--- a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala
@@ -5,7 +5,7 @@
package akka.camel.internal.component
import org.scalatest.mock.MockitoSugar
-import org.mockito.Matchers.{ eq ⇒ the, any }
+import org.mockito.Matchers.any
import org.mockito.Mockito._
import org.apache.camel.AsyncCallback
import java.util.concurrent.atomic.AtomicBoolean
@@ -15,7 +15,7 @@ import akka.testkit.{ TestKit, TestProbe }
import java.lang.String
import akka.actor.{ ActorRef, Props, ActorSystem, Actor }
import akka.camel._
-import internal.CamelExchangeAdapter
+import internal.{ DefaultCamel, CamelExchangeAdapter }
import org.scalatest.{ Suite, WordSpec, BeforeAndAfterAll, BeforeAndAfterEach }
import akka.camel.TestSupport._
import java.util.concurrent.{ TimeoutException, CountDownLatch, TimeUnit }
@@ -269,7 +269,10 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo
asyncCallback = createAsyncCallback
probe = TestProbe()
- camel = mock[Camel]
+ camel = camelWithMocks
+ def camelWithMocks = new DefaultCamel(mock[ActorSystem]) {
+ override lazy val settings = mock[CamelSettings]
+ }
exchange = mock[CamelExchangeAdapter]
callback = mock[AsyncCallback]
actorEndpointPath = mock[ActorEndpointPath]
@@ -321,7 +324,7 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo
}
def config(endpointUri: String = "test-uri", isAutoAck: Boolean = true, _replyTimeout: Duration = Int.MaxValue seconds) = {
- val endpoint = new ActorEndpoint(endpointUri, actorComponent, actorEndpointPath, camel, system)
+ val endpoint = new ActorEndpoint(endpointUri, actorComponent, actorEndpointPath, camel)
endpoint.autoAck = isAutoAck
endpoint.replyTimeout = _replyTimeout
endpoint