Changed akka camel URI pattern to akka ActorPath pattern, like akka://system/user/someactor

fixed some fixmes regarding using the reference.conf config
Added some more java tests for custom routes with producers and consumers
changed autoack to autoAck
This commit is contained in:
RayRoestenburg 2012-07-18 08:06:07 +02:00
parent 2472e46263
commit ee4a8afee9
21 changed files with 308 additions and 103 deletions

View file

@ -0,0 +1,16 @@
####################################
# Akka Camel Reference Config File #
####################################
# This is the reference config file that contains all the default settings.
# Make your edits/overrides in your application.conf.
akka {
camel {
consumer {
autoAck = true
replyTimeout = 1m
activationTimeout = 10s
}
}
}

View file

@ -4,9 +4,10 @@
package akka.camel package akka.camel
import internal.component.ActorEndpointPath import internal.component.CamelPath
import akka.actor.ActorRef import akka.actor.ActorRef
import org.apache.camel.model.ProcessorDefinition import org.apache.camel.model.ProcessorDefinition
import akka.util.Duration
/** /**
* Wraps a [[org.apache.camel.model.ProcessorDefinition]]. * Wraps a [[org.apache.camel.model.ProcessorDefinition]].
@ -23,22 +24,25 @@ import org.apache.camel.model.ProcessorDefinition
* }}} * }}}
* @param definition the processor definition * @param definition the processor definition
*/ */
class ActorRouteDefinition(definition: ProcessorDefinition[_]) { class ActorRouteDefinition[T <: ProcessorDefinition[T]](definition: ProcessorDefinition[T]) {
/** /**
* Sends the message to an ActorRef endpoint. * Sends the message to an ActorRef endpoint.
* @param actorRef the consumer with a default configuration. * @param actorRef the actorRef to the actor.
* @return the path to the actor, as a camel uri String * @return the path to the actor, as a camel uri String
*/ */
def to(actorRef: ActorRef) = //FIXME What is the return type of this? def to(actorRef: ActorRef): T = definition.to(CamelPath.toUri(actorRef))
definition.to(ActorEndpointPath(actorRef).toCamelPath())
/** /**
* Sends the message to an ActorRef endpoint * Sends the message to an ActorRef endpoint
* @param actorRef the consumer * @param actorRef the consumer
* @param consumerConfig the configuration for the consumer * @param autoAck Determines whether one-way communications between an endpoint and this consumer actor
* should be auto-acknowledged or application-acknowledged.
* This flag has only effect when exchange is in-only.
* @param replyTimeout When endpoint is out-capable (can produce responses) replyTimeout is the maximum time
* the endpoint can take to send the response before the message exchange fails. It defaults to 1 minute.
* This setting is used for out-capable, in-only, manually acknowledged communication.
* @return the path to the actor, as a camel uri String * @return the path to the actor, as a camel uri String
*/ */
def to(actorRef: ActorRef, consumerConfig: ConsumerConfig) = //FIXME What is the return type of this? def to(actorRef: ActorRef, autoAck: Boolean, replyTimeout: Duration): T = definition.to(CamelPath.toUri(actorRef, autoAck, replyTimeout))
definition.to(ActorEndpointPath(actorRef).toCamelPath(consumerConfig))
} }

View file

@ -229,7 +229,7 @@ object CamelMessage {
/** /**
* Positive acknowledgement message (used for application-acknowledged message receipts). * Positive acknowledgement message (used for application-acknowledged message receipts).
* When `autoack` is set to false in the [[akka.camel.Consumer]], you can send an `Ack` to the sender of the CamelMessage. * When `autoAck` is set to false in the [[akka.camel.Consumer]], you can send an `Ack` to the sender of the CamelMessage.
* @author Martin Krasser * @author Martin Krasser
*/ */
case object Ack { case object Ack {

View file

@ -0,0 +1,14 @@
package akka.camel
import akka.actor.Actor
private[camel] trait CamelSupport { this: Actor
/**
* camel extension
*/
protected[this] implicit def camel = CamelExtension(context.system)
/**
* camelContext implicit is useful when using advanced methods of CamelMessage.
*/
protected[this] implicit def camelContext = camel.context
}

View file

@ -6,47 +6,48 @@ package akka.camel
import internal.component.DurationTypeConverter import internal.component.DurationTypeConverter
import org.apache.camel.model.{ RouteDefinition, ProcessorDefinition } import org.apache.camel.model.{ RouteDefinition, ProcessorDefinition }
import akka.actor._ import akka.actor._
import akka.util.Duration import akka.util.Duration
import akka.util.duration._ import akka.util.duration._
import java.util.concurrent.TimeUnit.MILLISECONDS
/** /**
* Mixed in by Actor implementations that consume message from Camel endpoints. * Mixed in by Actor implementations that consume message from Camel endpoints.
* *
* @author Martin Krasser * @author Martin Krasser
*/ */
trait Consumer extends Actor with ConsumerConfig { trait Consumer extends Actor with CamelSupport with ConsumerConfig {
def endpointUri: String def endpointUri: String
protected[this] implicit def camel = CamelExtension(context.system)
protected[this] implicit def camelContext = camel.context
camel.registerConsumer(endpointUri, this, activationTimeout) camel.registerConsumer(endpointUri, this, activationTimeout)
} }
trait ConsumerConfig { case object DefaultConsumerParameters {
val replyTimeout = 1 minute
val autoAck = true
}
trait ConsumerConfig { this: Actor
private val config = this.context.system.settings.config
/** /**
* How long the actor should wait for activation before it fails. * How long the actor should wait for activation before it fails.
*/ */
def activationTimeout: Duration = 10 seconds // FIXME Should be configured in reference.conf def activationTimeout: Duration = Duration(config.getMilliseconds("akka.camel.consumer.activationTimeout"), MILLISECONDS)
/** /**
* When endpoint is out-capable (can produce responses) replyTimeout is the maximum time * When endpoint is out-capable (can produce responses) replyTimeout is the maximum time
* the endpoint can take to send the response before the message exchange fails. It defaults to 1 minute. * the endpoint can take to send the response before the message exchange fails. It defaults to 1 minute.
* This setting is used for out-capable, in-only, manually acknowledged communication. * This setting is used for out-capable, in-only, manually acknowledged communication.
* When the blocking is set to Blocking replyTimeout is ignored.
*/ */
def replyTimeout: Duration = 1 minute // FIXME Should be configured in reference.conf def replyTimeout: Duration = Duration(config.getMilliseconds("akka.camel.consumer.replyTimeout"), MILLISECONDS)
/** /**
* Determines whether one-way communications between an endpoint and this consumer actor * Determines whether one-way communications between an endpoint and this consumer actor
* should be auto-acknowledged or application-acknowledged. * should be auto-acknowledged or application-acknowledged.
* This flag has only effect when exchange is in-only. * This flag has only effect when exchange is in-only.
*/ */
def autoack: Boolean = true // FIXME Should be configured in reference.conf def autoAck: Boolean = config.getBoolean("akka.camel.consumer.autoAck")
/** /**
* The route definition handler for creating a custom route to this consumer instance. * The route definition handler for creating a custom route to this consumer instance.
@ -54,9 +55,4 @@ trait ConsumerConfig {
//FIXME: write a test confirming onRouteDefinition gets called //FIXME: write a test confirming onRouteDefinition gets called
def onRouteDefinition(rd: RouteDefinition): ProcessorDefinition[_] = rd def onRouteDefinition(rd: RouteDefinition): ProcessorDefinition[_] = rd
/**
* For internal use only. Converts this ConsumerConfig to camel URI parameters
* @return
*/
private[camel] def toCamelParameters: String = "autoack=%s&replyTimeout=%s" format (autoack, DurationTypeConverter.toString(replyTimeout))
} }

View file

@ -15,13 +15,7 @@ import org.apache.camel.processor.SendProcessor
* *
* @author Martin Krasser * @author Martin Krasser
*/ */
trait ProducerSupport { this: Actor trait ProducerSupport extends CamelSupport { this: Actor
protected[this] implicit def camel = CamelExtension(context.system) // FIXME This is duplicated from Consumer, create a common base-trait?
/**
* camelContext implicit is useful when using advanced methods of CamelMessage.
*/
protected[this] implicit def camelContext = camel.context // FIXME This is duplicated from Consumer, create a common base-trait?
protected[this] lazy val (endpoint: Endpoint, processor: SendProcessor) = camel.registerProducer(self, endpointUri) protected[this] lazy val (endpoint: Endpoint, processor: SendProcessor) = camel.registerProducer(self, endpointUri)

View file

@ -5,7 +5,7 @@
package akka.camel.internal package akka.camel.internal
import akka.camel._ import akka.camel._
import component.ActorEndpointPath import component.CamelPath
import java.io.InputStream import java.io.InputStream
import org.apache.camel.builder.RouteBuilder import org.apache.camel.builder.RouteBuilder
@ -122,7 +122,7 @@ private[camel] case class RegisterConsumer(endpointUri: String, actorRef: ActorR
*/ */
private[camel] class ConsumerActorRouteBuilder(endpointUri: String, consumer: ActorRef, config: ConsumerConfig) extends RouteBuilder { private[camel] class ConsumerActorRouteBuilder(endpointUri: String, consumer: ActorRef, config: ConsumerConfig) extends RouteBuilder {
protected def targetActorUri = ActorEndpointPath(consumer).toCamelPath(config) protected def targetActorUri = CamelPath.toUri(consumer, config.autoAck, config.replyTimeout)
def configure() { def configure() {
val scheme = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..." val scheme = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..."

View file

@ -28,7 +28,7 @@ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel {
val ctx = new DefaultCamelContext val ctx = new DefaultCamelContext
ctx.setName(system.name) ctx.setName(system.name)
ctx.setStreamCaching(true) ctx.setStreamCaching(true)
ctx.addComponent("actor", new ActorComponent(this)) ctx.addComponent("akka", new ActorComponent(this, system))
ctx.getTypeConverterRegistry.addTypeConverter(classOf[Duration], classOf[String], DurationTypeConverter) ctx.getTypeConverterRegistry.addTypeConverter(classOf[Duration], classOf[String], DurationTypeConverter)
ctx ctx
} }

View file

@ -13,12 +13,11 @@ import akka.actor._
import akka.pattern._ import akka.pattern._
import scala.reflect.BeanProperty import scala.reflect.BeanProperty
import akka.util.duration._
import java.util.concurrent.{ TimeoutException, CountDownLatch } import java.util.concurrent.{ TimeoutException, CountDownLatch }
import akka.camel.internal.CamelExchangeAdapter import akka.camel.internal.CamelExchangeAdapter
import akka.util.{ NonFatal, Duration, Timeout } import akka.util.{ NonFatal, Duration, Timeout }
import akka.camel.{ ActorNotRegisteredException, ConsumerConfig, Camel, Ack, FailureResult, CamelMessage } import akka.camel.{ ActorNotRegisteredException, Camel, Ack, FailureResult, CamelMessage }
import java.util.concurrent.TimeUnit.MILLISECONDS
/** /**
* For internal use only. * For internal use only.
* Creates Camel [[org.apache.camel.Endpoint]]s that send messages to [[akka.camel.Consumer]] actors through an [[akka.camel.internal.component.ActorProducer]]. * Creates Camel [[org.apache.camel.Endpoint]]s that send messages to [[akka.camel.Consumer]] actors through an [[akka.camel.internal.component.ActorProducer]].
@ -31,12 +30,12 @@ import akka.camel.{ ActorNotRegisteredException, ConsumerConfig, Camel, Ack, Fai
* *
* @author Martin Krasser * @author Martin Krasser
*/ */
private[camel] class ActorComponent(camel: Camel) extends DefaultComponent { private[camel] class ActorComponent(camel: Camel, system: ActorSystem) extends DefaultComponent {
/** /**
* @see org.apache.camel.Component * @see org.apache.camel.Component
*/ */
def createEndpoint(uri: String, remaining: String, parameters: JMap[String, Object]): ActorEndpoint = def createEndpoint(uri: String, remaining: String, parameters: JMap[String, Object]): ActorEndpoint =
new ActorEndpoint(uri, this, ActorEndpointPath.fromCamelPath(remaining), camel) new ActorEndpoint(uri, this, ActorEndpointPath.fromCamelPath(uri), camel, system)
} }
/** /**
@ -47,7 +46,7 @@ private[camel] class ActorComponent(camel: Camel) extends DefaultComponent {
* The `ActorEndpoint`s are created by the [[akka.camel.internal.component.ActorComponent]]. * The `ActorEndpoint`s are created by the [[akka.camel.internal.component.ActorComponent]].
* *
* Actors are referenced using actor endpoint URIs of the following format: * Actors are referenced using actor endpoint URIs of the following format:
* <code>actor://path:[actorPath]?[options]%s</code>, * <code>[actorPath]?[options]%s</code>,
* where <code>[actorPath]</code> refers to the actor path to the actor. * where <code>[actorPath]</code> refers to the actor path to the actor.
* *
* @author Martin Krasser * @author Martin Krasser
@ -55,14 +54,15 @@ private[camel] class ActorComponent(camel: Camel) extends DefaultComponent {
private[camel] class ActorEndpoint(uri: String, private[camel] class ActorEndpoint(uri: String,
comp: ActorComponent, comp: ActorComponent,
val path: ActorEndpointPath, val path: ActorEndpointPath,
camel: Camel) extends DefaultEndpoint(uri, comp) with ActorEndpointConfig { camel: Camel,
val system: ActorSystem) extends DefaultEndpoint(uri, comp) with ActorEndpointConfig {
/** /**
* The ActorEndpoint only supports receiving messages from Camel. * The ActorEndpoint only supports receiving messages from Camel.
* The createProducer method (not to be confused with a producer actor) is used to send messages into the endpoint. * The createProducer method (not to be confused with a producer actor) is used to send messages into the endpoint.
* The ActorComponent is only there to send to actors registered through an actor endpoint URI. * The ActorComponent is only there to send to actors registered through an actor endpoint URI.
* You can use an actor as an endpoint to send to in a camel route (as in, a Camel Consumer Actor). so from(someuri) to (actoruri), but not 'the other way around'. * You can use an actor as an endpoint to send to in a camel route (as in, a Camel Consumer Actor). so from(someuri) to (actoruri), but not 'the other way around'.
* Supporting createConsumer would mean that messages are consumed from an Actor endpoint in a route, and an Actor is not necessarily a producer of messages * Supporting createConsumer would mean that messages are consumed from an Actor endpoint in a route, and an Actor is not necessarily a producer of messages.
* [[akka.camel.Producer]] Actors can be used for sending messages to some other uri/ component type registered in Camel. * [[akka.camel.Producer]] Actors can be used for sending messages to some other uri/ component type registered in Camel.
* @throws UnsupportedOperationException this method is not supported * @throws UnsupportedOperationException this method is not supported
*/ */
@ -89,16 +89,17 @@ private[camel] class ActorEndpoint(uri: String,
*/ */
private[camel] trait ActorEndpointConfig { private[camel] trait ActorEndpointConfig {
def path: ActorEndpointPath def path: ActorEndpointPath
def system: ActorSystem
@BeanProperty var replyTimeout: Duration = 1 minute // FIXME default should be in config, not code private def config = system.settings.config
@BeanProperty var replyTimeout: Duration = Duration(config.getMilliseconds("akka.camel.consumer.replyTimeout"), MILLISECONDS)
/** /**
* Whether to auto-acknowledge one-way message exchanges with (untyped) actors. This is * Whether to auto-acknowledge one-way message exchanges with (untyped) actors. This is
* set via the <code>autoack=true|false</code> endpoint URI parameter. Default value is * set via the <code>autoAck=true|false</code> endpoint URI parameter. Default value is
* <code>true</code>. When set to <code>false</code> consumer actors need to additionally * <code>true</code>. When set to <code>false</code> consumer actors need to additionally
* call <code>Consumer.ack</code> within <code>Actor.receive</code>. * call <code>Consumer.ack</code> within <code>Actor.receive</code>.
*/ */
@BeanProperty var autoack: Boolean = true @BeanProperty var autoAck: Boolean = config.getBoolean("akka.camel.consumer.autoAck")
} }
/** /**
@ -159,7 +160,7 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
if (exchange.isOutCapable) { //InOut if (exchange.isOutCapable) { //InOut
sendAsync(message, onComplete = forwardResponseTo(exchange) andThen notifyDoneAsynchronously) sendAsync(message, onComplete = forwardResponseTo(exchange) andThen notifyDoneAsynchronously)
} else { // inOnly } else { // inOnly
if (endpoint.autoack) { //autoAck if (endpoint.autoAck) { //autoAck
fireAndForget(message, exchange) fireAndForget(message, exchange)
notifyDoneSynchronously() notifyDoneSynchronously()
true // done sync true // done sync
@ -172,7 +173,7 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
private def forwardResponseTo(exchange: CamelExchangeAdapter): PartialFunction[Either[Throwable, Any], Unit] = { private def forwardResponseTo(exchange: CamelExchangeAdapter): PartialFunction[Either[Throwable, Any], Unit] = {
case Right(failure: FailureResult) exchange.setFailure(failure) case Right(failure: FailureResult) exchange.setFailure(failure)
case Right(msg) exchange.setResponse(CamelMessage.canonicalize(msg)) case Right(msg) exchange.setResponse(CamelMessage.canonicalize(msg))
case Left(e: TimeoutException) exchange.setFailure(FailureResult(new TimeoutException("Failed to get response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) case Left(e: TimeoutException) exchange.setFailure(FailureResult(new TimeoutException("Failed to get response from the actor [%s] within timeout [%s]. Check replyTimeout [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint))))
case Left(throwable) exchange.setFailure(FailureResult(throwable)) case Left(throwable) exchange.setFailure(FailureResult(throwable))
} }
@ -180,7 +181,7 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
case Right(Ack) { /* no response message to set */ } case Right(Ack) { /* no response message to set */ }
case Right(failure: FailureResult) exchange.setFailure(failure) case Right(failure: FailureResult) exchange.setFailure(failure)
case Right(msg) exchange.setFailure(FailureResult(new IllegalArgumentException("Expected Ack or Failure message, but got: [%s] from actor [%s]" format (msg, endpoint.path)))) case Right(msg) exchange.setFailure(FailureResult(new IllegalArgumentException("Expected Ack or Failure message, but got: [%s] from actor [%s]" format (msg, endpoint.path))))
case Left(e: TimeoutException) exchange.setFailure(FailureResult(new TimeoutException("Failed to get Ack or Failure response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) case Left(e: TimeoutException) exchange.setFailure(FailureResult(new TimeoutException("Failed to get Ack or Failure response from the actor [%s] within timeout [%s]. Check replyTimeout [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint))))
case Left(throwable) exchange.setFailure(FailureResult(throwable)) case Left(throwable) exchange.setFailure(FailureResult(throwable))
} }
@ -231,7 +232,7 @@ private[camel] case class ActorEndpointPath private (actorPath: String) {
import ActorEndpointPath._ import ActorEndpointPath._
require(actorPath != null) require(actorPath != null)
require(actorPath.length() > 0) require(actorPath.length() > 0)
def toCamelPath(config: ConsumerConfig = consumerConfig): String = CamelPath.toCamelUri(actorPath, config) require(actorPath.startsWith("akka://"))
def findActorIn(system: ActorSystem): Option[ActorRef] = { def findActorIn(system: ActorSystem): Option[ActorRef] = {
val ref = system.actorFor(actorPath) val ref = system.actorFor(actorPath)
@ -240,53 +241,47 @@ private[camel] case class ActorEndpointPath private (actorPath: String) {
} }
/** /**
* Converts ActorRefs and actorPaths to uri's that point to the actor through the Camel Actor Component. * Converts ActorRefs and actorPaths to URI's that point to the actor through the Camel Actor Component.
* Can also be used in the Java API as a helper for custom route builders. the Scala API has an implicit conversion in the camel package to * Can also be used in the Java API as a helper for custom route builders. the Scala API has an implicit conversion in the camel package to
* directly use `to(actorRef)`. In java you could use `to(CamelPath.toCamelUri(actorRef)`. * directly use `to(actorRef)`. In java you could use `to(CamelPath.toUri(actorRef)`.
* The URI to the actor is exactly the same as the string representation of the ActorPath, except that it can also have optional URI parameters to configure the Consumer Actor.
*/ */
object CamelPath { object CamelPath {
private val uriFormat = "actor://path:%s?%s"
/** /**
* Converts the actorRef to a camel URI (string) which can be used in custom routes. * Converts the actorRef to a Camel URI (string) which can be used in custom routes.
* The created URI will have no parameters, it is purely the string representation of the actor's path.
* @param actorRef the actorRef * @param actorRef the actorRef
* @return the camel URI to the actor. * @return the camel URI to the actor.
*/ */
def toCamelUri(actorRef: ActorRef): String = uriFormat format (actorRef.path.toString, ActorEndpointPath.consumerConfig.toCamelParameters) def toUri(actorRef: ActorRef): String = actorRef.path.toString
/** /**
* Converts the actorRef to a camel URI (string) which can be used in custom routes. the consumerConfig that is supplied is used to determine the parameters of * Converts the actorRef to a Camel URI (string) which can be used in custom routes.
* the URI. * Use this version of toUri when you know that the actorRef points to a Consumer Actor and you would like to
* set autoAck and replyTimeout parameters to non-default values.
*
* @param actorRef the actorRef * @param actorRef the actorRef
* @param consumerConfig the configuration of the Consumer actor * @param autoAck parameter for a Consumer Actor, see [[akka.camel.ConsumerConfig]]
* @return the camel URI to the actor. * @param replyTimeout parameter for a Consumer Actor, see [[akka.camel.ConsumerConfig]]
* @return the camel URI to the Consumer actor, including the parameters for auto acknowledgement and replyTimeout.
*/ */
def toCamelUri(actorRef: ActorRef, consumerConfig: ConsumerConfig): String = uriFormat format (actorRef.path.toString, consumerConfig.toCamelParameters) def toUri(actorRef: ActorRef, autoAck: Boolean, replyTimeout: Duration): String = "%s?autoAck=%s&replyTimeout=%s".format(actorRef.path.toString, autoAck, replyTimeout.toString)
/**
* Converts the actorPath to a camel URI (string) which can be used in custom routes. the consumerConfig that is supplied is used to determine the parameters of
* the URI to the consumer actor.
* @param actorPath the actor path string
* @param consumerConfig the configuration of the Consumer actor
* @return the camel URI to the actor.
*/
def toCamelUri(actorPath: String, consumerConfig: ConsumerConfig): String = uriFormat format (actorPath, consumerConfig.toCamelParameters)
} }
/** /**
* For internal use only. Companion of `ActorEndpointPath` * For internal use only. Companion of `ActorEndpointPath`
*/ */
private[camel] object ActorEndpointPath { private[camel] case object ActorEndpointPath {
private[camel] val consumerConfig: ConsumerConfig = new ConsumerConfig {}
def apply(actorRef: ActorRef): ActorEndpointPath = new ActorEndpointPath(actorRef.path.toString) def apply(actorRef: ActorRef): ActorEndpointPath = new ActorEndpointPath(actorRef.path.toString)
/** /**
* Creates an [[akka.camel.internal.component.ActorEndpointPath]] from the remaining part of the endpoint URI (the part after the scheme, without the parameters of the URI). * Creates an [[akka.camel.internal.component.ActorEndpointPath]] from the uri
* Expects the remaining part of the URI (the actor path) in a format: path:%s * Expects the uri in the akka [[akka.actor.ActorPath]] format, i.e 'akka://system/user/someactor'.
* parameters can be optionally added to the actor path to indicate auto-acknowledgement and replyTimeout for a [[akka.camel.Consumer]] actor.
*/ */
def fromCamelPath(camelPath: String): ActorEndpointPath = camelPath match { def fromCamelPath(camelPath: String): ActorEndpointPath = camelPath match {
case id if id startsWith "path:" new ActorEndpointPath(id substring 5) case id if id startsWith "akka://" new ActorEndpointPath(id.split('?')(0))
case _ throw new IllegalArgumentException("Invalid path: [%s] - should be path:<actorPath>" format camelPath) case _ throw new IllegalArgumentException("Invalid path: [%s] - should be an actorPath starting with 'akka://', optionally followed by options" format camelPath)
} }
} }

View file

@ -14,5 +14,5 @@ package object camel {
* from("file://data/input/CamelConsumer").to(actor) * from("file://data/input/CamelConsumer").to(actor)
* }}} * }}}
*/ */
implicit def toActorRouteDefinition(definition: ProcessorDefinition[_]) = new ActorRouteDefinition(definition) implicit def toActorRouteDefinition[T <: ProcessorDefinition[T]](definition: ProcessorDefinition[T]) = new ActorRouteDefinition(definition)
} }

View file

@ -0,0 +1,176 @@
package akka.camel;
import akka.actor.*;
import akka.camel.internal.component.CamelPath;
import akka.camel.javaapi.UntypedConsumerActor;
import akka.camel.javaapi.UntypedProducerActor;
import akka.util.FiniteDuration;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
public class CustomRouteTestBase {
private static Camel camel;
private static ActorSystem system;
@BeforeClass
public static void setUpBeforeClass() {
system = ActorSystem.create("test");
camel = (Camel) CamelExtension.get(system);
}
@AfterClass
public static void cleanup() {
system.shutdown();
}
@Test
public void testCustomProducerRoute() throws Exception {
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockProducer", MockEndpoint.class);
ActorRef producer = system.actorOf(new Props(MockEndpointProducer.class), "mockEndpoint");
camel.context().addRoutes(new CustomRouteBuilder("direct:test",producer));
camel.template().sendBody("direct:test", "test");
assertMockEndpoint(mockEndpoint);
system.stop(producer);
}
@Test
public void testCustomConsumerRoute() throws Exception {
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockConsumer", MockEndpoint.class);
ActorRef consumer = system.actorOf(new Props(TestConsumer.class), "testConsumer");
camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS));
camel.context().addRoutes(new CustomRouteBuilder("direct:testConsumer",consumer));
camel.template().sendBody("direct:testConsumer", "test");
assertMockEndpoint(mockEndpoint);
system.stop(consumer);
}
@Test
public void testCustomAckConsumerRoute() throws Exception {
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAck", MockEndpoint.class);
ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){
public Actor create() {
return new TestAckConsumer("mock:mockAck");
}
}), "testConsumerAck");
camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS));
camel.context().addRoutes(new CustomRouteBuilder("direct:testAck", consumer, false, new FiniteDuration(10, TimeUnit.SECONDS)));
camel.template().sendBody("direct:testAck", "test");
assertMockEndpoint(mockEndpoint);
system.stop(consumer);
}
@Test
public void testCustomAckConsumerRouteFromUri() throws Exception {
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAckUri", MockEndpoint.class);
ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){
public Actor create() {
return new TestAckConsumer("mock:mockAckUri");
}
}), "testConsumerAckUri");
camel.awaitActivation(consumer,new FiniteDuration(10, TimeUnit.SECONDS));
camel.context().addRoutes(new CustomRouteBuilder("direct:testAckFromUri","akka://test/user/testConsumerAckUri?autoAck=false"));
camel.template().sendBody("direct:testAckFromUri", "test");
assertMockEndpoint(mockEndpoint);
system.stop(consumer);
}
@Test(expected=CamelExecutionException.class)
public void testCustomTimeoutConsumerRoute() throws Exception {
ActorRef consumer = system.actorOf(new Props( new UntypedActorFactory(){
public Actor create() {
return new TestAckConsumer("mock:mockAckUri");
}
}), "testConsumerException");
camel.awaitActivation(consumer, new FiniteDuration(10, TimeUnit.SECONDS));
camel.context().addRoutes(new CustomRouteBuilder("direct:testException", consumer, false, new FiniteDuration(0, TimeUnit.SECONDS)));
camel.template().sendBody("direct:testException", "test");
}
private void assertMockEndpoint(MockEndpoint mockEndpoint) throws InterruptedException {
mockEndpoint.expectedMessageCount(1);
mockEndpoint.expectedMessagesMatches(new Predicate() {
public boolean matches(Exchange exchange) {
return exchange.getIn().getBody().equals("test");
}
});
mockEndpoint.assertIsSatisfied();
}
public static class CustomRouteBuilder extends RouteBuilder {
private String uri;
private String fromUri;
public CustomRouteBuilder(String from, String to) {
fromUri = from;
uri = to;
}
public CustomRouteBuilder(String from, ActorRef actor) {
fromUri = from;
uri = CamelPath.toUri(actor);
}
public CustomRouteBuilder(String from, ActorRef actor, boolean autoAck, FiniteDuration replyTimeout) {
fromUri = from;
uri = CamelPath.toUri(actor, autoAck, replyTimeout);
}
@Override
public void configure() throws Exception {
from(fromUri).to(uri);
}
}
public static class TestAckConsumer extends UntypedConsumerActor {
String endpoint;
public TestAckConsumer(String to){
endpoint = to;
}
@Override
public String getEndpointUri() {
return "direct:testconsumer";
}
@Override
public void onReceive(Object message) {
this.getProducerTemplate().sendBody(endpoint, "test");
getSender().tell(Ack.getInstance());
}
}
public static class TestConsumer extends UntypedConsumerActor {
@Override
public String getEndpointUri() {
return "direct:testconsumer";
}
@Override
public void onReceive(Object message) {
this.getProducerTemplate().sendBody("mock:mockConsumer","test");
}
}
public static class MockEndpointProducer extends UntypedProducerActor {
public String getEndpointUri() {
return "mock:mockProducer";
}
@Override
public boolean isOneway() {
return true;
}
}
}

View file

@ -168,5 +168,5 @@ trait ErrorPassing {
} }
trait ManualAckConsumer extends Consumer { trait ManualAckConsumer extends Consumer {
override def autoack = false override def autoAck = false
} }

View file

@ -0,0 +1,5 @@
package akka.camel
import org.scalatest.junit.JUnitSuite
class CustomRouteTest extends CustomRouteTestBase with JUnitSuite

View file

@ -12,15 +12,15 @@ import org.scalatest.WordSpec
class ActorComponentConfigurationTest extends WordSpec with MustMatchers with SharedCamelSystem { class ActorComponentConfigurationTest extends WordSpec with MustMatchers with SharedCamelSystem {
val component: Component = camel.context.getComponent("actor") val component: Component = camel.context.getComponent("akka")
"Endpoint url config must be correctly parsed" in { "Endpoint url config must be correctly parsed" in {
val actorEndpointConfig = component.createEndpoint("actor://path:akka://test/user/$a?autoack=false&replyTimeout=987000000+nanos").asInstanceOf[ActorEndpointConfig] val actorEndpointConfig = component.createEndpoint("akka://test/user/$a?autoAck=false&replyTimeout=987000000+nanos").asInstanceOf[ActorEndpointConfig]
actorEndpointConfig must have( actorEndpointConfig must have(
'endpointUri("actor://path:akka://test/user/$a?autoack=false&replyTimeout=987000000+nanos"), 'endpointUri("akka://test/user/$a?autoAck=false&replyTimeout=987000000+nanos"),
'path(ActorEndpointPath.fromCamelPath("path:akka://test/user/$a")), 'path(ActorEndpointPath.fromCamelPath("akka://test/user/$a")),
'autoack(false), 'autoAck(false),
'replyTimeout(987000000 nanos)) 'replyTimeout(987000000 nanos))
} }

View file

@ -12,16 +12,21 @@ import org.scalatest.WordSpec
class ActorEndpointPathTest extends WordSpec with SharedCamelSystem with MustMatchers with MockitoSugar { class ActorEndpointPathTest extends WordSpec with SharedCamelSystem with MustMatchers with MockitoSugar {
def find(path: String) = ActorEndpointPath.fromCamelPath("path:" + path).findActorIn(system) def find(path: String) = ActorEndpointPath.fromCamelPath(path).findActorIn(system)
"findActorIn returns Some(actor ref) if actor exists" in { "findActorIn returns Some(actor ref) if actor exists" in {
val path = system.actorOf(Props(behavior = ctx { case _ {} })).path val path = system.actorOf(Props(behavior = ctx { case _ {} }), "knownactor").path
find(path.toString) must be('defined) find(path.toString) must be('defined)
} }
"findActorIn returns None" when { "findActorIn returns None" when {
"invalid path" in { find("some_invalid_path") must be(None) } "non existing valid path" in { find("akka://system/user/unknownactor") must be(None) }
"non existing valid path" in { find("akka://system/user/$a") must be(None) } }
"fromCamelPath throws IllegalArgumentException" when {
"invalid path" in {
intercept[IllegalArgumentException] {
find("invalidpath")
}
}
} }
} }

View file

@ -321,8 +321,8 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo
} }
def config(endpointUri: String = "test-uri", isAutoAck: Boolean = true, _replyTimeout: Duration = Int.MaxValue seconds) = { def config(endpointUri: String = "test-uri", isAutoAck: Boolean = true, _replyTimeout: Duration = Int.MaxValue seconds) = {
val endpoint = new ActorEndpoint(endpointUri, actorComponent, actorEndpointPath, camel) val endpoint = new ActorEndpoint(endpointUri, actorComponent, actorEndpointPath, camel, system)
endpoint.autoack = isAutoAck endpoint.autoAck = isAutoAck
endpoint.replyTimeout = _replyTimeout endpoint.replyTimeout = _replyTimeout
endpoint endpoint
} }

View file

@ -218,7 +218,7 @@ is added to the consumer actor's mailbox. Any failure or exception that occurs
during processing of that message by the consumer actor cannot be reported back during processing of that message by the consumer actor cannot be reported back
to the endpoint in this case. To allow consumer actors to positively or to the endpoint in this case. To allow consumer actors to positively or
negatively acknowledge the receipt of a message from an in-only message negatively acknowledge the receipt of a message from an in-only message
exchange, they need to override the ``autoack`` method to return false. exchange, they need to override the ``autoAck`` method to return false.
In this case, consumer actors must reply either with a In this case, consumer actors must reply either with a
special Ack message (positive acknowledgement) or a Failure (negative special Ack message (positive acknowledgement) or a Failure (negative
acknowledgement). acknowledgement).
@ -404,7 +404,7 @@ engine`_.
This component accepts the following endpoint URI format: This component accepts the following endpoint URI format:
* ``actor://path:[<actor-path>]?<options>`` * ``[<actor-path>]?<options>``
where ``<actor-path>`` is the ``ActorPath`` to the actor. The ``<options>`` are where ``<actor-path>`` is the ``ActorPath`` to the actor. The ``<options>`` are
name-value pairs separated by ``&`` (i.e. ``name1=value1&name2=value2&...``). name-value pairs separated by ``&`` (i.e. ``name1=value1&name2=value2&...``).
@ -427,7 +427,7 @@ The following URI options are supported:
| | | | | | | | | |
| | | | See also :ref:`camel-timeout`. | | | | | See also :ref:`camel-timeout`. |
+--------------+----------+---------+------------------------------------------------+ +--------------+----------+---------+------------------------------------------------+
| autoack | Boolean | true | If set to true, in-only message exchanges | | autoAck | Boolean | true | If set to true, in-only message exchanges |
| | | | are auto-acknowledged when the message is | | | | | are auto-acknowledged when the message is |
| | | | added to the actor's mailbox. If set to | | | | | added to the actor's mailbox. If set to |
| | | | false, actors must acknowledge the | | | | | false, actors must acknowledge the |
@ -438,7 +438,7 @@ The following URI options are supported:
Here's an actor endpoint URI example containing an actor uuid:: Here's an actor endpoint URI example containing an actor uuid::
actor://path:akka://some-system/user/myconsumer?autoack=false&replyTimeout=100+millis akka://some-system/user/myconsumer?autoAck=false&replyTimeout=100+millis
In the following example, a custom route to an actor is created, using the In the following example, a custom route to an actor is created, using the
actor's path. actor's path.

View file

@ -8,7 +8,7 @@ import akka.camel.javaapi.UntypedConsumerActor;
public class Consumer3 extends UntypedConsumerActor{ public class Consumer3 extends UntypedConsumerActor{
@Override @Override
public boolean autoack() { public boolean autoAck() {
return false; return false;
} }

View file

@ -12,7 +12,7 @@ public class CustomRouteBuilder extends RouteBuilder{
} }
public void configure() throws Exception { public void configure() throws Exception {
from("jetty:http://localhost:8877/camel/custom").to(CamelPath.toCamelUri(responder)); from("jetty:http://localhost:8877/camel/custom").to(CamelPath.toUri(responder));
} }
} }
//#CustomRoute //#CustomRoute

View file

@ -215,7 +215,7 @@ is added to the consumer actor's mailbox. Any failure or exception that occurs
during processing of that message by the consumer actor cannot be reported back during processing of that message by the consumer actor cannot be reported back
to the endpoint in this case. To allow consumer actors to positively or to the endpoint in this case. To allow consumer actors to positively or
negatively acknowledge the receipt of a message from an in-only message negatively acknowledge the receipt of a message from an in-only message
exchange, they need to override the ``autoack`` method to return false. exchange, they need to override the ``autoAck`` method to return false.
In this case, consumer actors must reply either with a In this case, consumer actors must reply either with a
special Ack message (positive acknowledgement) or a Failure (negative special Ack message (positive acknowledgement) or a Failure (negative
acknowledgement). acknowledgement).
@ -399,7 +399,7 @@ engine`_.
This component accepts the following endpoint URI format: This component accepts the following endpoint URI format:
* ``actor://path:[<actor-path>]?<options>`` * ``[<actor-path>]?<options>``
where ``<actor-path>`` is the ``ActorPath`` to the actor. The ``<options>`` are where ``<actor-path>`` is the ``ActorPath`` to the actor. The ``<options>`` are
name-value pairs separated by ``&`` (i.e. ``name1=value1&name2=value2&...``). name-value pairs separated by ``&`` (i.e. ``name1=value1&name2=value2&...``).
@ -422,7 +422,7 @@ The following URI options are supported:
| | | | | | | | | |
| | | | See also :ref:`camel-timeout`. | | | | | See also :ref:`camel-timeout`. |
+--------------+----------+---------+-------------------------------------------+ +--------------+----------+---------+-------------------------------------------+
| autoack | Boolean | true | If set to true, in-only message exchanges | | autoAck | Boolean | true | If set to true, in-only message exchanges |
| | | | are auto-acknowledged when the message is | | | | | are auto-acknowledged when the message is |
| | | | added to the actor's mailbox. If set to | | | | | added to the actor's mailbox. If set to |
| | | | false, actors must acknowledge the | | | | | false, actors must acknowledge the |
@ -433,7 +433,7 @@ The following URI options are supported:
Here's an actor endpoint URI example containing an actor uuid:: Here's an actor endpoint URI example containing an actor uuid::
actor://path:akka://some-system/user/myconsumer?autoack=false&replyTimeout=100+millis akka://some-system/user/myconsumer?autoAck=false&replyTimeout=100+millis
In the following example, a custom route to an actor is created, using the In the following example, a custom route to an actor is created, using the
actor's path. the akka camel package contains an implicit ``toActorRouteDefinition`` that allows for a route to actor's path. the akka camel package contains an implicit ``toActorRouteDefinition`` that allows for a route to

View file

@ -34,7 +34,7 @@ object Consumers {
import akka.actor.Status.Failure import akka.actor.Status.Failure
class Consumer3 extends Consumer { class Consumer3 extends Consumer {
override def autoack = false override def autoAck = false
def endpointUri = "jms:queue:test" def endpointUri = "jms:queue:test"