From a481272b676338ab51e769853efaecb09e7e72af Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 27 Jun 2013 16:45:47 +0200 Subject: [PATCH] Keep the order of buffered messages in Camel producer, see #3477 * fix bug in java sample * doc corrections --- .../src/main/scala/akka/camel/Camel.scala | 8 ++-- .../main/scala/akka/camel/CamelMessage.scala | 3 +- .../main/scala/akka/camel/CamelSupport.scala | 3 +- .../src/main/scala/akka/camel/Consumer.scala | 4 +- .../src/main/scala/akka/camel/Producer.scala | 19 ++++------ .../camel/internal/ActivationMessage.scala | 12 +++--- .../camel/internal/ActivationTracker.scala | 10 +++-- .../camel/internal/CamelExchangeAdapter.scala | 12 +++--- .../akka/camel/internal/CamelSupervisor.scala | 22 +++++------ .../internal/ConsumerActorRouteBuilder.scala | 2 +- .../akka/camel/internal/DefaultCamel.scala | 7 +--- .../internal/component/ActorComponent.scala | 21 ++++++---- akka-docs/rst/java/camel.rst | 38 +++++++++---------- .../docs/camel/sample/route/Consumer3.java | 5 ++- akka-docs/rst/scala/camel.rst | 34 ++++++++--------- 15 files changed, 101 insertions(+), 99 deletions(-) diff --git a/akka-camel/src/main/scala/akka/camel/Camel.scala b/akka-camel/src/main/scala/akka/camel/Camel.scala index 1273f525f1..3e03e77e24 100644 --- a/akka-camel/src/main/scala/akka/camel/Camel.scala +++ b/akka-camel/src/main/scala/akka/camel/Camel.scala @@ -42,12 +42,14 @@ trait Camel extends Extension with Activation { def settings: CamelSettings /** - * For internal use only. Returns the camel supervisor actor. + * INTERNAL API + * Returns the camel supervisor actor. */ private[camel] def supervisor: ActorRef /** - * For internal use only. Returns the associated ActorSystem. + * INTERNAL API + * Returns the associated ActorSystem. */ private[camel] def system: ActorSystem } @@ -122,7 +124,7 @@ object CamelExtension extends ExtensionId[Camel] with ExtensionIdProvider { * Creates a new instance of Camel and makes sure it gets stopped when the actor system is shutdown. */ override def createExtension(system: ExtendedActorSystem): Camel = { - val camel = new DefaultCamel(system).start + val camel = new DefaultCamel(system).start() system.registerOnTermination(camel.shutdown()) camel } diff --git a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala index f39d542e1d..113c3766f7 100644 --- a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala +++ b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala @@ -164,9 +164,8 @@ object CamelMessage { private[camel] def from(camelMessage: JCamelMessage, headers: Map[String, Any]): CamelMessage = CamelMessage(camelMessage.getBody, headers ++ camelMessage.getHeaders) /** - * For internal use only. + * INTERNAL API * copies the content of this CamelMessage to an Apache Camel Message. - * */ private[camel] def copyContent(from: CamelMessage, to: JCamelMessage): Unit = { to.setBody(from.body) diff --git a/akka-camel/src/main/scala/akka/camel/CamelSupport.scala b/akka-camel/src/main/scala/akka/camel/CamelSupport.scala index 2471c3400d..4e8b7dfa48 100644 --- a/akka-camel/src/main/scala/akka/camel/CamelSupport.scala +++ b/akka-camel/src/main/scala/akka/camel/CamelSupport.scala @@ -11,7 +11,8 @@ import java.util.concurrent.TimeUnit._ private[camel] trait CamelSupport { this: Actor ⇒ /** - * For internal use only. Returns a [[akka.camel.Camel]] trait which provides access to the CamelExtension. + * INTERNAL API + * Returns a [[akka.camel.Camel]] trait which provides access to the CamelExtension. */ protected val camel = CamelExtension(context.system) diff --git a/akka-camel/src/main/scala/akka/camel/Consumer.scala b/akka-camel/src/main/scala/akka/camel/Consumer.scala index c60830943b..3df4149c10 100644 --- a/akka-camel/src/main/scala/akka/camel/Consumer.scala +++ b/akka-camel/src/main/scala/akka/camel/Consumer.scala @@ -30,7 +30,7 @@ trait Consumer extends Actor with CamelSupport { super.preStart() // Possible FIXME. registering the endpoint here because of problems // with order of execution of trait body in the Java version (UntypedConsumerActor) - // where getEndpointUri is called before its constructor (where a uri is set to return from getEndpointUri) + // where getEndpointUri is called before its constructor (where a uri is set to return from getEndpointUri) // and remains null. CustomRouteTest provides a test to verify this. register() } @@ -88,7 +88,7 @@ private[camel] object Consumer { } } /** - * For internal use only. + * INTERNAL API * Captures the configuration of the Consumer. */ private[camel] case class ConsumerConfig(activationTimeout: FiniteDuration, replyTimeout: FiniteDuration, autoAck: Boolean, onRouteDefinition: RouteDefinition ⇒ ProcessorDefinition[_]) extends NoSerializationVerificationNeeded diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala index 64ab0f61f8..7e96e4aa93 100644 --- a/akka-camel/src/main/scala/akka/camel/Producer.scala +++ b/akka-camel/src/main/scala/akka/camel/Producer.scala @@ -15,7 +15,7 @@ import org.apache.camel.processor.SendProcessor * Support trait for producing messages to Camel endpoints. */ trait ProducerSupport extends Actor with CamelSupport { - private[this] var messages = Map[ActorRef, Any]() + private[this] var messages = Vector.empty[(ActorRef, Any)] private[this] var producerChild: Option[ActorRef] = None override def preStart() { @@ -63,9 +63,9 @@ trait ProducerSupport extends Actor with CamelSupport { messages = { for ( child ← producerChild; - (sender, msg) ← messages - ) child.tell(transformOutgoingMessage(msg), sender) - Map() + (snd, msg) ← messages + ) child.tell(transformOutgoingMessage(msg), snd) + Vector.empty } } case res: MessageResult ⇒ routeResponse(res.message) @@ -77,7 +77,7 @@ trait ProducerSupport extends Actor with CamelSupport { case msg ⇒ producerChild match { case Some(child) ⇒ child forward transformOutgoingMessage(msg) - case None ⇒ messages += (sender -> msg) + case None ⇒ messages :+= ((sender, msg)) } } @@ -117,8 +117,7 @@ trait ProducerSupport extends Actor with CamelSupport { * as argument to receiveAfterProduce. If the response is received synchronously from * the endpoint then receiveAfterProduce is called synchronously as well. If the * response is received asynchronously, the receiveAfterProduce is called - * asynchronously. The original - * sender and senderFuture are preserved. + * asynchronously. The original sender is preserved. * * @see CamelMessage#canonicalize(Any) * @param endpoint the endpoint @@ -157,14 +156,12 @@ trait Producer extends ProducerSupport { this: Actor ⇒ } /** - * For internal use only. - * + * INTERNAL API */ private case class MessageResult(message: CamelMessage) extends NoSerializationVerificationNeeded /** - * For internal use only. - * + * INTERNAL API */ private case class FailureResult(cause: Throwable, headers: Map[String, Any] = Map.empty) extends NoSerializationVerificationNeeded diff --git a/akka-camel/src/main/scala/akka/camel/internal/ActivationMessage.scala b/akka-camel/src/main/scala/akka/camel/internal/ActivationMessage.scala index 2208cad046..a115b4d614 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/ActivationMessage.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/ActivationMessage.scala @@ -18,15 +18,15 @@ private[camel] object ActivationProtocol { private[camel] abstract class ActivationMessage(val actor: ActorRef) extends Serializable /** - * For internal use only. companion object of ActivationMessage - * + * INTERNAL API + * companion object of ActivationMessage */ private[camel] object ActivationMessage { def unapply(msg: ActivationMessage): Option[ActorRef] = Option(msg.actor) } /** - * For internal use only. + * INTERNAL API * Event message indicating that a single endpoint has been activated. * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]] * to await activation or de-activation of endpoints. @@ -36,7 +36,7 @@ private[camel] object ActivationProtocol { final case class EndpointActivated(actorRef: ActorRef) extends ActivationMessage(actorRef) /** - * For internal use only. + * INTERNAL API * Event message indicating that a single endpoint failed to activate. * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]] * to await activation or de-activation of endpoints. @@ -47,7 +47,7 @@ private[camel] object ActivationProtocol { final case class EndpointFailedToActivate(actorRef: ActorRef, cause: Throwable) extends ActivationMessage(actorRef) /** - * For internal use only. + * INTERNAL API * Event message indicating that a single endpoint was de-activated. * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]] * to await activation or de-activation of endpoints. @@ -57,7 +57,7 @@ private[camel] object ActivationProtocol { final case class EndpointDeActivated(actorRef: ActorRef) extends ActivationMessage(actorRef) /** - * For internal use only. + * INTERNAL API * Event message indicating that a single endpoint failed to de-activate. * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]] * to await activation or de-activation of endpoints. diff --git a/akka-camel/src/main/scala/akka/camel/internal/ActivationTracker.scala b/akka-camel/src/main/scala/akka/camel/internal/ActivationTracker.scala index 17ee254bd5..a85cb665d2 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/ActivationTracker.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/ActivationTracker.scala @@ -9,7 +9,8 @@ import collection.mutable.WeakHashMap import akka.camel.internal.ActivationProtocol._ /** - * For internal use only. An actor that tracks activation and de-activation of endpoints. + * INTERNAL API + * An actor that tracks activation and de-activation of endpoints. */ private[camel] class ActivationTracker extends Actor with ActorLogging { @@ -108,14 +109,15 @@ private[camel] class ActivationTracker extends Actor with ActorLogging { } /** - * For internal use only. A request message to the ActivationTracker for the status of activation. + * INTERNAL API + * A request message to the ActivationTracker for the status of activation. * @param ref the actorRef */ private[camel] case class AwaitActivation(ref: ActorRef) extends ActivationMessage(ref) /** - * For internal use only. A request message to the ActivationTracker for the status of de-activation. - * For internal use only. + * INTERNAL API + * A request message to the ActivationTracker for the status of de-activation. * @param ref the actorRef */ private[camel] case class AwaitDeActivation(ref: ActorRef) extends ActivationMessage(ref) diff --git a/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala b/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala index 19548a4b50..edba8b36d2 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala @@ -8,13 +8,11 @@ import org.apache.camel.{ Exchange, Message ⇒ JCamelMessage } import akka.camel.{ FailureResult, AkkaCamelException, CamelMessage } /** - * For internal use only. - * Adapter for converting an [[org.apache.camel.Exchange]] to and from [[akka.camel.CamelMessage]] and [[akka.camel.Failure]] objects. - * The org.apache.camel.Message is mutable and not suitable to be used directly as messages between Actors. - * This adapter is used to convert to immutable messages to be used with Actors, and convert the immutable messages back - * to org.apache.camel.Message when using Camel. - * - * + * INTERNAL API + * Adapter for converting an [[org.apache.camel.Exchange]] to and from [[akka.camel.CamelMessage]] and [[akka.camel.Failure]] objects. + * The org.apache.camel.Message is mutable and not suitable to be used directly as messages between Actors. + * This adapter is used to convert to immutable messages to be used with Actors, and convert the immutable messages back + * to org.apache.camel.Message when using Camel. */ private[camel] class CamelExchangeAdapter(val exchange: Exchange) { /** diff --git a/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala b/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala index 5a5083f07e..d16b387f91 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala @@ -15,7 +15,7 @@ import akka.AkkaException import akka.camel.internal.ActivationProtocol._ /** - * For internal use only. + * INTERNAL API * Top level supervisor for internal Camel actors */ private[camel] class CamelSupervisor extends Actor with CamelSupport { @@ -36,7 +36,7 @@ private[camel] class CamelSupervisor extends Actor with CamelSupport { } /** - * For internal use only. + * INTERNAL API * Messages for the camel supervisor, registrations and de-registrations. */ private[camel] object CamelSupervisor { @@ -45,46 +45,46 @@ private[camel] object CamelSupervisor { sealed trait CamelSupervisorMessage extends Serializable /** - * For internal use only. + * INTERNAL API * Registers a consumer or a producer. */ case class Register(actorRef: ActorRef, endpointUri: String, config: Option[ConsumerConfig] = None) extends NoSerializationVerificationNeeded /** - * For internal use only. + * INTERNAL API * De-registers a producer or a consumer. */ @SerialVersionUID(1L) case class DeRegister(actorRef: ActorRef) extends CamelSupervisorMessage /** - * For internal use only. + * INTERNAL API * Adds a watch for the actor */ @SerialVersionUID(1L) case class AddWatch(actorRef: ActorRef) extends CamelSupervisorMessage /** - * For internal use only. + * INTERNAL API * Provides a Producer with the required camel objects to function. */ case class CamelProducerObjects(endpoint: Endpoint, processor: SendProcessor) extends NoSerializationVerificationNeeded } /** - * For internal use only. + * INTERNAL API * Thrown by registrars to indicate that the actor could not be de-activated. */ private[camel] class ActorDeActivationException(val actorRef: ActorRef, cause: Throwable) extends AkkaException("Actor [%s] failed to de-activate".format(actorRef), cause) /** - * For internal use only. + * INTERNAL API * Thrown by the registrars to indicate that the actor could not be activated. */ private[camel] class ActorActivationException(val actorRef: ActorRef, cause: Throwable) extends AkkaException("Actor [%s] failed to activate".format(actorRef), cause) /** - * For internal use only. + * INTERNAL API * Registry for Camel Consumers and Producers. Supervises the registrars. */ private[camel] class Registry(activationTracker: ActorRef) extends Actor with CamelSupport { @@ -138,7 +138,7 @@ private[camel] class Registry(activationTracker: ActorRef) extends Actor with Ca } /** - * For internal use only. + * INTERNAL API * Registers Producers. */ private[camel] class ProducerRegistrar(activationTracker: ActorRef) extends Actor with CamelSupport { @@ -176,7 +176,7 @@ private[camel] class ProducerRegistrar(activationTracker: ActorRef) extends Acto } /** - * For internal use only. + * INTERNAL API * Registers Consumers. */ private[camel] class ConsumerRegistrar(activationTracker: ActorRef) extends Actor with CamelSupport { diff --git a/akka-camel/src/main/scala/akka/camel/internal/ConsumerActorRouteBuilder.scala b/akka-camel/src/main/scala/akka/camel/internal/ConsumerActorRouteBuilder.scala index 27f25af1ab..5494cf1287 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/ConsumerActorRouteBuilder.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/ConsumerActorRouteBuilder.scala @@ -11,7 +11,7 @@ import org.apache.camel.builder.RouteBuilder import org.apache.camel.model.RouteDefinition /** - * For internal use only. + * INTERNAL API * Builder of a route to a target which can be an actor. * * @param endpointUri endpoint URI of the consumer actor. diff --git a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala index 0732ab4971..0902ad812c 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala @@ -20,7 +20,7 @@ import org.apache.camel.model.RouteDefinition import akka.actor.{ ExtendedActorSystem, ActorRef, Props, ActorSystem } /** - * For internal use only. + * INTERNAL API * Creates an instance of the Camel subsystem. * * @param system is used to create internal actors needed by camel instance. @@ -30,9 +30,6 @@ import akka.actor.{ ExtendedActorSystem, ActorRef, Props, ActorSystem } */ private[camel] class DefaultCamel(val system: ExtendedActorSystem) extends Camel { val supervisor = system.actorOf(Props[CamelSupervisor], "camel-supervisor") - /** - * For internal use only. - */ private[camel] implicit val log = Logging(system, "Camel") lazy val context: DefaultCamelContext = { @@ -52,7 +49,7 @@ private[camel] class DefaultCamel(val system: ExtendedActorSystem) extends Camel /** * Starts camel and underlying camel context and template. * Only the creator of Camel should start and stop it. - * @see akka.camel.DefaultCamel#stop() + * @see akka.camel.DefaultCamel#shutdown() */ def start(): this.type = { context.start() diff --git a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala index 420957c0f8..b54b1a5fcb 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala @@ -22,7 +22,7 @@ import support.TypeConverterSupport import scala.util.{ Failure, Success, Try } /** - * For internal use only. + * INTERNAL API * Creates Camel [[org.apache.camel.Endpoint]]s that send messages to [[akka.camel.Consumer]] actors through an [[akka.camel.internal.component.ActorProducer]]. * The `ActorComponent` is a Camel [[org.apache.camel.Component]]. * @@ -42,7 +42,7 @@ private[camel] class ActorComponent(camel: Camel, system: ActorSystem) extends D } /** - * For internal use only. + * INTERNAL API * Does what an endpoint does, creates consumers and producers for the component. The `ActorEndpoint` is a Camel [[org.apache.camel.Endpoint]] that is used to * receive messages from Camel. Sending messages from the `ActorComponent` is not supported, a [[akka.camel.Producer]] actor should be used instead. * @@ -85,7 +85,7 @@ private[camel] class ActorEndpoint(uri: String, } /** - * For internal use only. + * INTERNAL API * Configures the `ActorEndpoint`. This needs to be a `bean` for Camel purposes. * */ @@ -127,7 +127,8 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex def process(exchange: Exchange, callback: AsyncCallback): Boolean = processExchangeAdapter(new CamelExchangeAdapter(exchange), callback) /** - * For internal use only. Processes the [[akka.camel.internal.CamelExchangeAdapter]] + * INTERNAL API + * Processes the [[akka.camel.internal.CamelExchangeAdapter]] * @param exchange the [[akka.camel.internal.CamelExchangeAdapter]] */ private[camel] def processExchangeAdapter(exchange: CamelExchangeAdapter): Unit = { @@ -137,7 +138,8 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex } /** - * For internal use only. Processes the [[akka.camel.internal.CamelExchangeAdapter]]. + * INTERNAL API + * Processes the [[akka.camel.internal.CamelExchangeAdapter]]. * This method is blocking when the exchange is inOnly. The method returns true if it executed synchronously/blocking. * @param exchange the [[akka.camel.internal.CamelExchangeAdapter]] * @param callback the callback @@ -184,7 +186,8 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex } /** - * For internal use only. Converts Strings to [[scala.concurrent.duration.Duration]] + * INTERNAL API + * Converts Strings to [[scala.concurrent.duration.Duration]] */ private[camel] object DurationTypeConverter extends TypeConverterSupport { @@ -198,7 +201,8 @@ private[camel] object DurationTypeConverter extends TypeConverterSupport { } /** - * For internal use only. An endpoint to an [[akka.actor.ActorRef]] + * INTERNAL API + * An endpoint to an [[akka.actor.ActorRef]] * @param actorPath the String representation of the path to the actor */ private[camel] case class ActorEndpointPath private (actorPath: String) { @@ -243,7 +247,8 @@ object CamelPath { } /** - * For internal use only. Companion of `ActorEndpointPath` + * INTERNAL API + * Companion of `ActorEndpointPath` */ private[camel] case object ActorEndpointPath { diff --git a/akka-docs/rst/java/camel.rst b/akka-docs/rst/java/camel.rst index 1dc622d79f..a9d357e685 100644 --- a/akka-docs/rst/java/camel.rst +++ b/akka-docs/rst/java/camel.rst @@ -5,22 +5,6 @@ Camel ############# -Additional Resources -==================== -For an introduction to akka-camel 2, see also the Peter Gabryanczyk's talk `Migrating akka-camel module to Akka 2.x`_. - -For an introduction to akka-camel 1, see also the `Appendix E - Akka and Camel`_ -(pdf) of the book `Camel in Action`_. - -.. _Appendix E - Akka and Camel: http://www.manning.com/ibsen/appEsample.pdf -.. _Camel in Action: http://www.manning.com/ibsen/ -.. _Migrating akka-camel module to Akka 2.x: http://skillsmatter.com/podcast/scala/akka-2-x - -Other, more advanced external articles (for version 1) are: - -* `Akka Consumer Actors: New Features and Best Practices `_ -* `Akka Producer Actors: New Features and Best Practices `_ - Introduction ============ @@ -308,8 +292,8 @@ In-out message exchanges between endpoints and actors are designed to be asynchronous. This is the case for both, consumer and producer actors. -* A consumer endpoint sends request messages to its consumer actor using the ``!`` - (tell) operator and the actor returns responses with ``sender !`` once they are +* A consumer endpoint sends request messages to its consumer actor using the ``tell`` + method and the actor returns responses with ``getSender().tell`` once they are ready. * A producer actor sends request messages to its endpoint using Camel's @@ -411,7 +395,7 @@ The following URI options are supported: | | | | See also :ref:`camel-acknowledgements-java`. | +--------------+----------+---------+------------------------------------------------+ -Here's an actor endpoint URI example containing an actor uuid:: +Here's an actor endpoint URI example containing an actor path:: akka://some-system/user/myconsumer?autoAck=false&replyTimeout=100+millis @@ -582,3 +566,19 @@ seconds: For more information about the Camel Quartz component, see here: http://camel.apache.org/quartz.html + +Additional Resources +==================== +For an introduction to akka-camel 2, see also the Peter Gabryanczyk's talk `Migrating akka-camel module to Akka 2.x`_. + +For an introduction to akka-camel 1, see also the `Appendix E - Akka and Camel`_ +(pdf) of the book `Camel in Action`_. + +.. _Appendix E - Akka and Camel: http://www.manning.com/ibsen/appEsample.pdf +.. _Camel in Action: http://www.manning.com/ibsen/ +.. _Migrating akka-camel module to Akka 2.x: http://skillsmatter.com/podcast/scala/akka-2-x + +Other, more advanced external articles (for version 1) are: + +* `Akka Consumer Actors: New Features and Best Practices `_ +* `Akka Producer Actors: New Features and Best Practices `_ diff --git a/akka-docs/rst/java/code/docs/camel/sample/route/Consumer3.java b/akka-docs/rst/java/code/docs/camel/sample/route/Consumer3.java index 5daba13cf9..05cd7eb85a 100644 --- a/akka-docs/rst/java/code/docs/camel/sample/route/Consumer3.java +++ b/akka-docs/rst/java/code/docs/camel/sample/route/Consumer3.java @@ -19,8 +19,9 @@ public class Consumer3 extends UntypedConsumerActor{ public void onReceive(Object message) { if (message instanceof CamelMessage) { CamelMessage camelMessage = (CamelMessage) message; - transformer.forward(camelMessage.getBodyAs(String.class, getCamelContext()), - getContext()); + // Forward a string representation of the message body to transformer + String body = camelMessage.getBodyAs(String.class, getCamelContext()); + transformer.forward(camelMessage.withBody(body), getContext()); } else unhandled(message); } diff --git a/akka-docs/rst/scala/camel.rst b/akka-docs/rst/scala/camel.rst index 7467f310e9..1861948fe8 100644 --- a/akka-docs/rst/scala/camel.rst +++ b/akka-docs/rst/scala/camel.rst @@ -5,22 +5,6 @@ Camel ############## -Additional Resources -==================== -For an introduction to akka-camel 2, see also the Peter Gabryanczyk's talk `Migrating akka-camel module to Akka 2.x`_. - -For an introduction to akka-camel 1, see also the `Appendix E - Akka and Camel`_ -(pdf) of the book `Camel in Action`_. - -.. _Appendix E - Akka and Camel: http://www.manning.com/ibsen/appEsample.pdf -.. _Camel in Action: http://www.manning.com/ibsen/ -.. _Migrating akka-camel module to Akka 2.x: http://skillsmatter.com/podcast/scala/akka-2-x - -Other, more advanced external articles (for version 1) are: - -* `Akka Consumer Actors: New Features and Best Practices `_ -* `Akka Producer Actors: New Features and Best Practices `_ - Introduction ============ @@ -407,7 +391,7 @@ The following URI options are supported: | | | | See also :ref:`camel-acknowledgements`. | +--------------+----------+---------+-------------------------------------------+ -Here's an actor endpoint URI example containing an actor uuid:: +Here's an actor endpoint URI example containing an actor path:: akka://some-system/user/myconsumer?autoAck=false&replyTimeout=100+millis @@ -570,3 +554,19 @@ seconds: For more information about the Camel Quartz component, see here: http://camel.apache.org/quartz.html + +Additional Resources +==================== +For an introduction to akka-camel 2, see also the Peter Gabryanczyk's talk `Migrating akka-camel module to Akka 2.x`_. + +For an introduction to akka-camel 1, see also the `Appendix E - Akka and Camel`_ +(pdf) of the book `Camel in Action`_. + +.. _Appendix E - Akka and Camel: http://www.manning.com/ibsen/appEsample.pdf +.. _Camel in Action: http://www.manning.com/ibsen/ +.. _Migrating akka-camel module to Akka 2.x: http://skillsmatter.com/podcast/scala/akka-2-x + +Other, more advanced external articles (for version 1) are: + +* `Akka Consumer Actors: New Features and Best Practices `_ +* `Akka Producer Actors: New Features and Best Practices `_