diff --git a/akka-camel/src/main/resources/reference.conf b/akka-camel/src/main/resources/reference.conf new file mode 100644 index 0000000000..658c729fb3 --- /dev/null +++ b/akka-camel/src/main/resources/reference.conf @@ -0,0 +1,16 @@ +#################################### +# Akka Camel Reference Config File # +#################################### + +# This is the reference config file that contains all the default settings. +# Make your edits/overrides in your application.conf. + +akka { + camel { + consumer { + autoAck = true + replyTimeout = 1m + activationTimeout = 10s + } + } +} diff --git a/akka-camel/src/main/scala/akka/camel/ActorRouteDefinition.scala b/akka-camel/src/main/scala/akka/camel/ActorRouteDefinition.scala index 6286edad87..ce8d19bec6 100644 --- a/akka-camel/src/main/scala/akka/camel/ActorRouteDefinition.scala +++ b/akka-camel/src/main/scala/akka/camel/ActorRouteDefinition.scala @@ -4,9 +4,10 @@ package akka.camel -import internal.component.ActorEndpointPath +import internal.component.CamelPath import akka.actor.ActorRef import org.apache.camel.model.ProcessorDefinition +import akka.util.Duration /** * Wraps a [[org.apache.camel.model.ProcessorDefinition]]. @@ -23,22 +24,25 @@ import org.apache.camel.model.ProcessorDefinition * }}} * @param definition the processor definition */ -class ActorRouteDefinition(definition: ProcessorDefinition[_]) { +class ActorRouteDefinition[T <: ProcessorDefinition[T]](definition: ProcessorDefinition[T]) { /** * Sends the message to an ActorRef endpoint. - * @param actorRef the consumer with a default configuration. + * @param actorRef the actorRef to the actor. * @return the path to the actor, as a camel uri String */ - def to(actorRef: ActorRef) = //FIXME What is the return type of this? - definition.to(ActorEndpointPath(actorRef).toCamelPath()) + def to(actorRef: ActorRef): T = definition.to(CamelPath.toUri(actorRef)) /** * Sends the message to an ActorRef endpoint * @param actorRef the consumer - * @param consumerConfig the configuration for the consumer + * @param autoAck Determines whether one-way communications between an endpoint and this consumer actor + * should be auto-acknowledged or application-acknowledged. + * This flag has only effect when exchange is in-only. + * @param replyTimeout When endpoint is out-capable (can produce responses) replyTimeout is the maximum time + * the endpoint can take to send the response before the message exchange fails. It defaults to 1 minute. + * This setting is used for out-capable, in-only, manually acknowledged communication. * @return the path to the actor, as a camel uri String */ - def to(actorRef: ActorRef, consumerConfig: ConsumerConfig) = //FIXME What is the return type of this? - definition.to(ActorEndpointPath(actorRef).toCamelPath(consumerConfig)) + def to(actorRef: ActorRef, autoAck: Boolean, replyTimeout: Duration): T = definition.to(CamelPath.toUri(actorRef, autoAck, replyTimeout)) } diff --git a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala index cb4121189d..fc29e08382 100644 --- a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala +++ b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala @@ -229,7 +229,7 @@ object CamelMessage { /** * Positive acknowledgement message (used for application-acknowledged message receipts). - * When `autoack` is set to false in the [[akka.camel.Consumer]], you can send an `Ack` to the sender of the CamelMessage. + * When `autoAck` is set to false in the [[akka.camel.Consumer]], you can send an `Ack` to the sender of the CamelMessage. * @author Martin Krasser */ case object Ack { diff --git a/akka-camel/src/main/scala/akka/camel/CamelSupport.scala b/akka-camel/src/main/scala/akka/camel/CamelSupport.scala new file mode 100644 index 0000000000..79e18dcaef --- /dev/null +++ b/akka-camel/src/main/scala/akka/camel/CamelSupport.scala @@ -0,0 +1,14 @@ +package akka.camel + +import akka.actor.Actor + +private[camel] trait CamelSupport { this: Actor ⇒ + /** + * camel extension + */ + protected[this] implicit def camel = CamelExtension(context.system) + /** + * camelContext implicit is useful when using advanced methods of CamelMessage. + */ + protected[this] implicit def camelContext = camel.context +} diff --git a/akka-camel/src/main/scala/akka/camel/Consumer.scala b/akka-camel/src/main/scala/akka/camel/Consumer.scala index 0351ce39cb..60b54d6509 100644 --- a/akka-camel/src/main/scala/akka/camel/Consumer.scala +++ b/akka-camel/src/main/scala/akka/camel/Consumer.scala @@ -6,47 +6,48 @@ package akka.camel import internal.component.DurationTypeConverter import org.apache.camel.model.{ RouteDefinition, ProcessorDefinition } - import akka.actor._ import akka.util.Duration import akka.util.duration._ +import java.util.concurrent.TimeUnit.MILLISECONDS /** * Mixed in by Actor implementations that consume message from Camel endpoints. * * @author Martin Krasser */ -trait Consumer extends Actor with ConsumerConfig { +trait Consumer extends Actor with CamelSupport with ConsumerConfig { def endpointUri: String - protected[this] implicit def camel = CamelExtension(context.system) - protected[this] implicit def camelContext = camel.context - camel.registerConsumer(endpointUri, this, activationTimeout) } -trait ConsumerConfig { +case object DefaultConsumerParameters { + val replyTimeout = 1 minute + val autoAck = true +} +trait ConsumerConfig { this: Actor ⇒ + private val config = this.context.system.settings.config /** * How long the actor should wait for activation before it fails. */ - def activationTimeout: Duration = 10 seconds // FIXME Should be configured in reference.conf + def activationTimeout: Duration = Duration(config.getMilliseconds("akka.camel.consumer.activationTimeout"), MILLISECONDS) /** * When endpoint is out-capable (can produce responses) replyTimeout is the maximum time * the endpoint can take to send the response before the message exchange fails. It defaults to 1 minute. * This setting is used for out-capable, in-only, manually acknowledged communication. - * When the blocking is set to Blocking replyTimeout is ignored. */ - def replyTimeout: Duration = 1 minute // FIXME Should be configured in reference.conf + def replyTimeout: Duration = Duration(config.getMilliseconds("akka.camel.consumer.replyTimeout"), MILLISECONDS) /** * Determines whether one-way communications between an endpoint and this consumer actor * should be auto-acknowledged or application-acknowledged. * This flag has only effect when exchange is in-only. */ - def autoack: Boolean = true // FIXME Should be configured in reference.conf + def autoAck: Boolean = config.getBoolean("akka.camel.consumer.autoAck") /** * The route definition handler for creating a custom route to this consumer instance. @@ -54,9 +55,4 @@ trait ConsumerConfig { //FIXME: write a test confirming onRouteDefinition gets called def onRouteDefinition(rd: RouteDefinition): ProcessorDefinition[_] = rd - /** - * For internal use only. Converts this ConsumerConfig to camel URI parameters - * @return - */ - private[camel] def toCamelParameters: String = "autoack=%s&replyTimeout=%s" format (autoack, DurationTypeConverter.toString(replyTimeout)) } diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala index 5a7262a133..335fb7ef1e 100644 --- a/akka-camel/src/main/scala/akka/camel/Producer.scala +++ b/akka-camel/src/main/scala/akka/camel/Producer.scala @@ -15,13 +15,7 @@ import org.apache.camel.processor.SendProcessor * * @author Martin Krasser */ -trait ProducerSupport { this: Actor ⇒ - protected[this] implicit def camel = CamelExtension(context.system) // FIXME This is duplicated from Consumer, create a common base-trait? - - /** - * camelContext implicit is useful when using advanced methods of CamelMessage. - */ - protected[this] implicit def camelContext = camel.context // FIXME This is duplicated from Consumer, create a common base-trait? +trait ProducerSupport extends CamelSupport { this: Actor ⇒ protected[this] lazy val (endpoint: Endpoint, processor: SendProcessor) = camel.registerProducer(self, endpointUri) diff --git a/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala b/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala index e915c9cd28..ec24832396 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala @@ -5,7 +5,7 @@ package akka.camel.internal import akka.camel._ -import component.ActorEndpointPath +import component.CamelPath import java.io.InputStream import org.apache.camel.builder.RouteBuilder @@ -122,7 +122,7 @@ private[camel] case class RegisterConsumer(endpointUri: String, actorRef: ActorR */ private[camel] class ConsumerActorRouteBuilder(endpointUri: String, consumer: ActorRef, config: ConsumerConfig) extends RouteBuilder { - protected def targetActorUri = ActorEndpointPath(consumer).toCamelPath(config) + protected def targetActorUri = CamelPath.toUri(consumer, config.autoAck, config.replyTimeout) def configure() { val scheme = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..." diff --git a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala index e390c799e9..5a239c766c 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala @@ -28,7 +28,7 @@ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel { val ctx = new DefaultCamelContext ctx.setName(system.name) ctx.setStreamCaching(true) - ctx.addComponent("actor", new ActorComponent(this)) + ctx.addComponent("akka", new ActorComponent(this, system)) ctx.getTypeConverterRegistry.addTypeConverter(classOf[Duration], classOf[String], DurationTypeConverter) ctx } diff --git a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala index cbadc08c8b..b0e72efe34 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala @@ -13,12 +13,11 @@ import akka.actor._ import akka.pattern._ import scala.reflect.BeanProperty -import akka.util.duration._ import java.util.concurrent.{ TimeoutException, CountDownLatch } import akka.camel.internal.CamelExchangeAdapter import akka.util.{ NonFatal, Duration, Timeout } -import akka.camel.{ ActorNotRegisteredException, ConsumerConfig, Camel, Ack, FailureResult, CamelMessage } - +import akka.camel.{ ActorNotRegisteredException, Camel, Ack, FailureResult, CamelMessage } +import java.util.concurrent.TimeUnit.MILLISECONDS /** * For internal use only. * Creates Camel [[org.apache.camel.Endpoint]]s that send messages to [[akka.camel.Consumer]] actors through an [[akka.camel.internal.component.ActorProducer]]. @@ -31,12 +30,12 @@ import akka.camel.{ ActorNotRegisteredException, ConsumerConfig, Camel, Ack, Fai * * @author Martin Krasser */ -private[camel] class ActorComponent(camel: Camel) extends DefaultComponent { +private[camel] class ActorComponent(camel: Camel, system: ActorSystem) extends DefaultComponent { /** * @see org.apache.camel.Component */ def createEndpoint(uri: String, remaining: String, parameters: JMap[String, Object]): ActorEndpoint = - new ActorEndpoint(uri, this, ActorEndpointPath.fromCamelPath(remaining), camel) + new ActorEndpoint(uri, this, ActorEndpointPath.fromCamelPath(uri), camel, system) } /** @@ -47,7 +46,7 @@ private[camel] class ActorComponent(camel: Camel) extends DefaultComponent { * The `ActorEndpoint`s are created by the [[akka.camel.internal.component.ActorComponent]]. * * Actors are referenced using actor endpoint URIs of the following format: - * actor://path:[actorPath]?[options]%s, + * [actorPath]?[options]%s, * where [actorPath] refers to the actor path to the actor. * * @author Martin Krasser @@ -55,14 +54,15 @@ private[camel] class ActorComponent(camel: Camel) extends DefaultComponent { private[camel] class ActorEndpoint(uri: String, comp: ActorComponent, val path: ActorEndpointPath, - camel: Camel) extends DefaultEndpoint(uri, comp) with ActorEndpointConfig { + camel: Camel, + val system: ActorSystem) extends DefaultEndpoint(uri, comp) with ActorEndpointConfig { /** * The ActorEndpoint only supports receiving messages from Camel. * The createProducer method (not to be confused with a producer actor) is used to send messages into the endpoint. * The ActorComponent is only there to send to actors registered through an actor endpoint URI. * You can use an actor as an endpoint to send to in a camel route (as in, a Camel Consumer Actor). so from(someuri) to (actoruri), but not 'the other way around'. - * Supporting createConsumer would mean that messages are consumed from an Actor endpoint in a route, and an Actor is not necessarily a producer of messages + * Supporting createConsumer would mean that messages are consumed from an Actor endpoint in a route, and an Actor is not necessarily a producer of messages. * [[akka.camel.Producer]] Actors can be used for sending messages to some other uri/ component type registered in Camel. * @throws UnsupportedOperationException this method is not supported */ @@ -89,16 +89,17 @@ private[camel] class ActorEndpoint(uri: String, */ private[camel] trait ActorEndpointConfig { def path: ActorEndpointPath - - @BeanProperty var replyTimeout: Duration = 1 minute // FIXME default should be in config, not code + def system: ActorSystem + private def config = system.settings.config + @BeanProperty var replyTimeout: Duration = Duration(config.getMilliseconds("akka.camel.consumer.replyTimeout"), MILLISECONDS) /** * Whether to auto-acknowledge one-way message exchanges with (untyped) actors. This is - * set via the autoack=true|false endpoint URI parameter. Default value is + * set via the autoAck=true|false endpoint URI parameter. Default value is * true. When set to false consumer actors need to additionally * call Consumer.ack within Actor.receive. */ - @BeanProperty var autoack: Boolean = true + @BeanProperty var autoAck: Boolean = config.getBoolean("akka.camel.consumer.autoAck") } /** @@ -159,7 +160,7 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex if (exchange.isOutCapable) { //InOut sendAsync(message, onComplete = forwardResponseTo(exchange) andThen notifyDoneAsynchronously) } else { // inOnly - if (endpoint.autoack) { //autoAck + if (endpoint.autoAck) { //autoAck fireAndForget(message, exchange) notifyDoneSynchronously() true // done sync @@ -172,7 +173,7 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex private def forwardResponseTo(exchange: CamelExchangeAdapter): PartialFunction[Either[Throwable, Any], Unit] = { case Right(failure: FailureResult) ⇒ exchange.setFailure(failure) case Right(msg) ⇒ exchange.setResponse(CamelMessage.canonicalize(msg)) - case Left(e: TimeoutException) ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) + case Left(e: TimeoutException) ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get response from the actor [%s] within timeout [%s]. Check replyTimeout [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) case Left(throwable) ⇒ exchange.setFailure(FailureResult(throwable)) } @@ -180,7 +181,7 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex case Right(Ack) ⇒ { /* no response message to set */ } case Right(failure: FailureResult) ⇒ exchange.setFailure(failure) case Right(msg) ⇒ exchange.setFailure(FailureResult(new IllegalArgumentException("Expected Ack or Failure message, but got: [%s] from actor [%s]" format (msg, endpoint.path)))) - case Left(e: TimeoutException) ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get Ack or Failure response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) + case Left(e: TimeoutException) ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get Ack or Failure response from the actor [%s] within timeout [%s]. Check replyTimeout [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) case Left(throwable) ⇒ exchange.setFailure(FailureResult(throwable)) } @@ -231,7 +232,7 @@ private[camel] case class ActorEndpointPath private (actorPath: String) { import ActorEndpointPath._ require(actorPath != null) require(actorPath.length() > 0) - def toCamelPath(config: ConsumerConfig = consumerConfig): String = CamelPath.toCamelUri(actorPath, config) + require(actorPath.startsWith("akka://")) def findActorIn(system: ActorSystem): Option[ActorRef] = { val ref = system.actorFor(actorPath) @@ -240,53 +241,47 @@ private[camel] case class ActorEndpointPath private (actorPath: String) { } /** - * Converts ActorRefs and actorPaths to uri's that point to the actor through the Camel Actor Component. + * Converts ActorRefs and actorPaths to URI's that point to the actor through the Camel Actor Component. * Can also be used in the Java API as a helper for custom route builders. the Scala API has an implicit conversion in the camel package to - * directly use `to(actorRef)`. In java you could use `to(CamelPath.toCamelUri(actorRef)`. + * directly use `to(actorRef)`. In java you could use `to(CamelPath.toUri(actorRef)`. + * The URI to the actor is exactly the same as the string representation of the ActorPath, except that it can also have optional URI parameters to configure the Consumer Actor. */ object CamelPath { - private val uriFormat = "actor://path:%s?%s" - /** - * Converts the actorRef to a camel URI (string) which can be used in custom routes. + * Converts the actorRef to a Camel URI (string) which can be used in custom routes. + * The created URI will have no parameters, it is purely the string representation of the actor's path. * @param actorRef the actorRef * @return the camel URI to the actor. */ - def toCamelUri(actorRef: ActorRef): String = uriFormat format (actorRef.path.toString, ActorEndpointPath.consumerConfig.toCamelParameters) + def toUri(actorRef: ActorRef): String = actorRef.path.toString /** - * Converts the actorRef to a camel URI (string) which can be used in custom routes. the consumerConfig that is supplied is used to determine the parameters of - * the URI. + * Converts the actorRef to a Camel URI (string) which can be used in custom routes. + * Use this version of toUri when you know that the actorRef points to a Consumer Actor and you would like to + * set autoAck and replyTimeout parameters to non-default values. + * * @param actorRef the actorRef - * @param consumerConfig the configuration of the Consumer actor - * @return the camel URI to the actor. + * @param autoAck parameter for a Consumer Actor, see [[akka.camel.ConsumerConfig]] + * @param replyTimeout parameter for a Consumer Actor, see [[akka.camel.ConsumerConfig]] + * @return the camel URI to the Consumer actor, including the parameters for auto acknowledgement and replyTimeout. */ - def toCamelUri(actorRef: ActorRef, consumerConfig: ConsumerConfig): String = uriFormat format (actorRef.path.toString, consumerConfig.toCamelParameters) - - /** - * Converts the actorPath to a camel URI (string) which can be used in custom routes. the consumerConfig that is supplied is used to determine the parameters of - * the URI to the consumer actor. - * @param actorPath the actor path string - * @param consumerConfig the configuration of the Consumer actor - * @return the camel URI to the actor. - */ - def toCamelUri(actorPath: String, consumerConfig: ConsumerConfig): String = uriFormat format (actorPath, consumerConfig.toCamelParameters) + def toUri(actorRef: ActorRef, autoAck: Boolean, replyTimeout: Duration): String = "%s?autoAck=%s&replyTimeout=%s".format(actorRef.path.toString, autoAck, replyTimeout.toString) } /** * For internal use only. Companion of `ActorEndpointPath` */ -private[camel] object ActorEndpointPath { - private[camel] val consumerConfig: ConsumerConfig = new ConsumerConfig {} +private[camel] case object ActorEndpointPath { def apply(actorRef: ActorRef): ActorEndpointPath = new ActorEndpointPath(actorRef.path.toString) /** - * Creates an [[akka.camel.internal.component.ActorEndpointPath]] from the remaining part of the endpoint URI (the part after the scheme, without the parameters of the URI). - * Expects the remaining part of the URI (the actor path) in a format: path:%s + * Creates an [[akka.camel.internal.component.ActorEndpointPath]] from the uri + * Expects the uri in the akka [[akka.actor.ActorPath]] format, i.e 'akka://system/user/someactor'. + * parameters can be optionally added to the actor path to indicate auto-acknowledgement and replyTimeout for a [[akka.camel.Consumer]] actor. */ def fromCamelPath(camelPath: String): ActorEndpointPath = camelPath match { - case id if id startsWith "path:" ⇒ new ActorEndpointPath(id substring 5) - case _ ⇒ throw new IllegalArgumentException("Invalid path: [%s] - should be path:" format camelPath) + case id if id startsWith "akka://" ⇒ new ActorEndpointPath(id.split('?')(0)) + case _ ⇒ throw new IllegalArgumentException("Invalid path: [%s] - should be an actorPath starting with 'akka://', optionally followed by options" format camelPath) } -} \ No newline at end of file +} diff --git a/akka-camel/src/main/scala/akka/package.scala b/akka-camel/src/main/scala/akka/package.scala index d3e60ae24f..f8cbc7d069 100644 --- a/akka-camel/src/main/scala/akka/package.scala +++ b/akka-camel/src/main/scala/akka/package.scala @@ -14,5 +14,5 @@ package object camel { * from("file://data/input/CamelConsumer").to(actor) * }}} */ - implicit def toActorRouteDefinition(definition: ProcessorDefinition[_]) = new ActorRouteDefinition(definition) + implicit def toActorRouteDefinition[T <: ProcessorDefinition[T]](definition: ProcessorDefinition[T]) = new ActorRouteDefinition(definition) } \ No newline at end of file diff --git a/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java new file mode 100644 index 0000000000..f97a62dae1 --- /dev/null +++ b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java @@ -0,0 +1,176 @@ +package akka.camel; + +import akka.actor.*; +import akka.camel.internal.component.CamelPath; +import akka.camel.javaapi.UntypedConsumerActor; +import akka.camel.javaapi.UntypedProducerActor; +import akka.util.FiniteDuration; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.Exchange; +import org.apache.camel.Predicate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +public class CustomRouteTestBase { + + private static Camel camel; + private static ActorSystem system; + + @BeforeClass + public static void setUpBeforeClass() { + system = ActorSystem.create("test"); + camel = (Camel) CamelExtension.get(system); + } + + @AfterClass + public static void cleanup() { + system.shutdown(); + } + + @Test + public void testCustomProducerRoute() throws Exception { + MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockProducer", MockEndpoint.class); + ActorRef producer = system.actorOf(new Props(MockEndpointProducer.class), "mockEndpoint"); + camel.context().addRoutes(new CustomRouteBuilder("direct:test",producer)); + camel.template().sendBody("direct:test", "test"); + assertMockEndpoint(mockEndpoint); + system.stop(producer); + } + + @Test + public void testCustomConsumerRoute() throws Exception { + MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockConsumer", MockEndpoint.class); + ActorRef consumer = system.actorOf(new Props(TestConsumer.class), "testConsumer"); + camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS)); + camel.context().addRoutes(new CustomRouteBuilder("direct:testConsumer",consumer)); + camel.template().sendBody("direct:testConsumer", "test"); + assertMockEndpoint(mockEndpoint); + system.stop(consumer); + } + + @Test + public void testCustomAckConsumerRoute() throws Exception { + MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAck", MockEndpoint.class); + ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){ + public Actor create() { + return new TestAckConsumer("mock:mockAck"); + } + }), "testConsumerAck"); + camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS)); + camel.context().addRoutes(new CustomRouteBuilder("direct:testAck", consumer, false, new FiniteDuration(10, TimeUnit.SECONDS))); + camel.template().sendBody("direct:testAck", "test"); + assertMockEndpoint(mockEndpoint); + system.stop(consumer); + } + + @Test + public void testCustomAckConsumerRouteFromUri() throws Exception { + MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAckUri", MockEndpoint.class); + ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){ + public Actor create() { + return new TestAckConsumer("mock:mockAckUri"); + } + }), "testConsumerAckUri"); + camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS)); + camel.context().addRoutes(new CustomRouteBuilder("direct:testAckFromUri","akka://test/user/testConsumerAckUri?autoAck=false")); + camel.template().sendBody("direct:testAckFromUri", "test"); + assertMockEndpoint(mockEndpoint); + system.stop(consumer); + } + + @Test(expected=CamelExecutionException.class) + public void testCustomTimeoutConsumerRoute() throws Exception { + ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){ + public Actor create() { + return new TestAckConsumer("mock:mockAckUri"); + } + }), "testConsumerException"); + camel.awaitActivation(consumer, new FiniteDuration(10, TimeUnit.SECONDS)); + camel.context().addRoutes(new CustomRouteBuilder("direct:testException", consumer, false, new FiniteDuration(0, TimeUnit.SECONDS))); + camel.template().sendBody("direct:testException", "test"); + } + + private void assertMockEndpoint(MockEndpoint mockEndpoint) throws InterruptedException { + mockEndpoint.expectedMessageCount(1); + mockEndpoint.expectedMessagesMatches(new Predicate() { + public boolean matches(Exchange exchange) { + return exchange.getIn().getBody().equals("test"); + } + }); + mockEndpoint.assertIsSatisfied(); + } + + public static class CustomRouteBuilder extends RouteBuilder { + private String uri; + private String fromUri; + + public CustomRouteBuilder(String from, String to) { + fromUri = from; + uri = to; + } + + public CustomRouteBuilder(String from, ActorRef actor) { + fromUri = from; + uri = CamelPath.toUri(actor); + } + + public CustomRouteBuilder(String from, ActorRef actor, boolean autoAck, FiniteDuration replyTimeout) { + fromUri = from; + uri = CamelPath.toUri(actor, autoAck, replyTimeout); + } + + @Override + public void configure() throws Exception { + from(fromUri).to(uri); + } + } + + public static class TestAckConsumer extends UntypedConsumerActor { + + String endpoint; + + public TestAckConsumer(String to){ + endpoint = to; + } + + @Override + public String getEndpointUri() { + return "direct:testconsumer"; + } + + @Override + public void onReceive(Object message) { + this.getProducerTemplate().sendBody(endpoint, "test"); + getSender().tell(Ack.getInstance()); + } + } + + + public static class TestConsumer extends UntypedConsumerActor { + @Override + public String getEndpointUri() { + return "direct:testconsumer"; + } + + @Override + public void onReceive(Object message) { + this.getProducerTemplate().sendBody("mock:mockConsumer","test"); + } + } + + public static class MockEndpointProducer extends UntypedProducerActor { + public String getEndpointUri() { + return "mock:mockProducer"; + } + + @Override + public boolean isOneway() { + return true; + } + } +} diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala index 5a0d52572f..f03150b8ae 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala @@ -168,5 +168,5 @@ trait ErrorPassing { } trait ManualAckConsumer extends Consumer { - override def autoack = false + override def autoAck = false } diff --git a/akka-camel/src/test/scala/akka/camel/CustomRouteTest.scala b/akka-camel/src/test/scala/akka/camel/CustomRouteTest.scala new file mode 100644 index 0000000000..f9b54eb8e8 --- /dev/null +++ b/akka-camel/src/test/scala/akka/camel/CustomRouteTest.scala @@ -0,0 +1,5 @@ +package akka.camel + +import org.scalatest.junit.JUnitSuite + +class CustomRouteTest extends CustomRouteTestBase with JUnitSuite diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/ActorComponentConfigurationTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/ActorComponentConfigurationTest.scala index dd37716295..c057b83cba 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/component/ActorComponentConfigurationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/component/ActorComponentConfigurationTest.scala @@ -12,15 +12,15 @@ import org.scalatest.WordSpec class ActorComponentConfigurationTest extends WordSpec with MustMatchers with SharedCamelSystem { - val component: Component = camel.context.getComponent("actor") + val component: Component = camel.context.getComponent("akka") "Endpoint url config must be correctly parsed" in { - val actorEndpointConfig = component.createEndpoint("actor://path:akka://test/user/$a?autoack=false&replyTimeout=987000000+nanos").asInstanceOf[ActorEndpointConfig] + val actorEndpointConfig = component.createEndpoint("akka://test/user/$a?autoAck=false&replyTimeout=987000000+nanos").asInstanceOf[ActorEndpointConfig] actorEndpointConfig must have( - 'endpointUri("actor://path:akka://test/user/$a?autoack=false&replyTimeout=987000000+nanos"), - 'path(ActorEndpointPath.fromCamelPath("path:akka://test/user/$a")), - 'autoack(false), + 'endpointUri("akka://test/user/$a?autoAck=false&replyTimeout=987000000+nanos"), + 'path(ActorEndpointPath.fromCamelPath("akka://test/user/$a")), + 'autoAck(false), 'replyTimeout(987000000 nanos)) } diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/ActorEndpointPathTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/ActorEndpointPathTest.scala index eddec13b8b..ada4497f29 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/component/ActorEndpointPathTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/component/ActorEndpointPathTest.scala @@ -12,16 +12,21 @@ import org.scalatest.WordSpec class ActorEndpointPathTest extends WordSpec with SharedCamelSystem with MustMatchers with MockitoSugar { - def find(path: String) = ActorEndpointPath.fromCamelPath("path:" + path).findActorIn(system) + def find(path: String) = ActorEndpointPath.fromCamelPath(path).findActorIn(system) "findActorIn returns Some(actor ref) if actor exists" in { - val path = system.actorOf(Props(behavior = ctx ⇒ { case _ ⇒ {} })).path + val path = system.actorOf(Props(behavior = ctx ⇒ { case _ ⇒ {} }), "knownactor").path find(path.toString) must be('defined) } "findActorIn returns None" when { - "invalid path" in { find("some_invalid_path") must be(None) } - "non existing valid path" in { find("akka://system/user/$a") must be(None) } + "non existing valid path" in { find("akka://system/user/unknownactor") must be(None) } + } + "fromCamelPath throws IllegalArgumentException" when { + "invalid path" in { + intercept[IllegalArgumentException] { + find("invalidpath") + } + } } - } \ No newline at end of file diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala index a0e153fd54..891b630a98 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala @@ -321,8 +321,8 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo } def config(endpointUri: String = "test-uri", isAutoAck: Boolean = true, _replyTimeout: Duration = Int.MaxValue seconds) = { - val endpoint = new ActorEndpoint(endpointUri, actorComponent, actorEndpointPath, camel) - endpoint.autoack = isAutoAck + val endpoint = new ActorEndpoint(endpointUri, actorComponent, actorEndpointPath, camel, system) + endpoint.autoAck = isAutoAck endpoint.replyTimeout = _replyTimeout endpoint } diff --git a/akka-docs/java/camel.rst b/akka-docs/java/camel.rst index d5f7a6f3d5..9128e1dd91 100644 --- a/akka-docs/java/camel.rst +++ b/akka-docs/java/camel.rst @@ -218,7 +218,7 @@ is added to the consumer actor's mailbox. Any failure or exception that occurs during processing of that message by the consumer actor cannot be reported back to the endpoint in this case. To allow consumer actors to positively or negatively acknowledge the receipt of a message from an in-only message -exchange, they need to override the ``autoack`` method to return false. +exchange, they need to override the ``autoAck`` method to return false. In this case, consumer actors must reply either with a special Ack message (positive acknowledgement) or a Failure (negative acknowledgement). @@ -404,7 +404,7 @@ engine`_. This component accepts the following endpoint URI format: -* ``actor://path:[]?`` +* ``[]?`` where ```` is the ``ActorPath`` to the actor. The ```` are name-value pairs separated by ``&`` (i.e. ``name1=value1&name2=value2&...``). @@ -427,7 +427,7 @@ The following URI options are supported: | | | | | | | | | See also :ref:`camel-timeout`. | +--------------+----------+---------+------------------------------------------------+ -| autoack | Boolean | true | If set to true, in-only message exchanges | +| autoAck | Boolean | true | If set to true, in-only message exchanges | | | | | are auto-acknowledged when the message is | | | | | added to the actor's mailbox. If set to | | | | | false, actors must acknowledge the | @@ -438,7 +438,7 @@ The following URI options are supported: Here's an actor endpoint URI example containing an actor uuid:: - actor://path:akka://some-system/user/myconsumer?autoack=false&replyTimeout=100+millis + akka://some-system/user/myconsumer?autoAck=false&replyTimeout=100+millis In the following example, a custom route to an actor is created, using the actor's path. @@ -608,4 +608,4 @@ seconds: .. includecode:: code/docs/camel/sample/quartz/QuartzSample.java#QuartzExample For more information about the Camel Quartz component, see here: -http://camel.apache.org/quartz.html \ No newline at end of file +http://camel.apache.org/quartz.html diff --git a/akka-docs/java/code/docs/camel/Consumer3.java b/akka-docs/java/code/docs/camel/Consumer3.java index 431318dc2c..bf661cb8ea 100644 --- a/akka-docs/java/code/docs/camel/Consumer3.java +++ b/akka-docs/java/code/docs/camel/Consumer3.java @@ -8,7 +8,7 @@ import akka.camel.javaapi.UntypedConsumerActor; public class Consumer3 extends UntypedConsumerActor{ @Override - public boolean autoack() { + public boolean autoAck() { return false; } diff --git a/akka-docs/java/code/docs/camel/CustomRouteBuilder.java b/akka-docs/java/code/docs/camel/CustomRouteBuilder.java index f30856b2a5..8bdbffeecd 100644 --- a/akka-docs/java/code/docs/camel/CustomRouteBuilder.java +++ b/akka-docs/java/code/docs/camel/CustomRouteBuilder.java @@ -12,7 +12,7 @@ public class CustomRouteBuilder extends RouteBuilder{ } public void configure() throws Exception { - from("jetty:http://localhost:8877/camel/custom").to(CamelPath.toCamelUri(responder)); + from("jetty:http://localhost:8877/camel/custom").to(CamelPath.toUri(responder)); } } //#CustomRoute diff --git a/akka-docs/scala/camel.rst b/akka-docs/scala/camel.rst index 1eec932b44..6ed7debe83 100644 --- a/akka-docs/scala/camel.rst +++ b/akka-docs/scala/camel.rst @@ -215,7 +215,7 @@ is added to the consumer actor's mailbox. Any failure or exception that occurs during processing of that message by the consumer actor cannot be reported back to the endpoint in this case. To allow consumer actors to positively or negatively acknowledge the receipt of a message from an in-only message -exchange, they need to override the ``autoack`` method to return false. +exchange, they need to override the ``autoAck`` method to return false. In this case, consumer actors must reply either with a special Ack message (positive acknowledgement) or a Failure (negative acknowledgement). @@ -399,7 +399,7 @@ engine`_. This component accepts the following endpoint URI format: -* ``actor://path:[]?`` +* ``[]?`` where ```` is the ``ActorPath`` to the actor. The ```` are name-value pairs separated by ``&`` (i.e. ``name1=value1&name2=value2&...``). @@ -422,7 +422,7 @@ The following URI options are supported: | | | | | | | | | See also :ref:`camel-timeout`. | +--------------+----------+---------+-------------------------------------------+ -| autoack | Boolean | true | If set to true, in-only message exchanges | +| autoAck | Boolean | true | If set to true, in-only message exchanges | | | | | are auto-acknowledged when the message is | | | | | added to the actor's mailbox. If set to | | | | | false, actors must acknowledge the | @@ -433,7 +433,7 @@ The following URI options are supported: Here's an actor endpoint URI example containing an actor uuid:: - actor://path:akka://some-system/user/myconsumer?autoack=false&replyTimeout=100+millis + akka://some-system/user/myconsumer?autoAck=false&replyTimeout=100+millis In the following example, a custom route to an actor is created, using the actor's path. the akka camel package contains an implicit ``toActorRouteDefinition`` that allows for a route to diff --git a/akka-docs/scala/code/docs/camel/Consumers.scala b/akka-docs/scala/code/docs/camel/Consumers.scala index 85b46646e2..abb43a048a 100644 --- a/akka-docs/scala/code/docs/camel/Consumers.scala +++ b/akka-docs/scala/code/docs/camel/Consumers.scala @@ -34,7 +34,7 @@ object Consumers { import akka.actor.Status.Failure class Consumer3 extends Consumer { - override def autoack = false + override def autoAck = false def endpointUri = "jms:queue:test"