From d0a50f66e73fb8d8fe0c64b6d96d1ffde9599f3e Mon Sep 17 00:00:00 2001 From: RayRoestenburg Date: Mon, 3 Sep 2012 12:08:46 +0200 Subject: [PATCH] Tickets #1924 #1925 #2383 #2387 #1927 #1926 Changed unlimited blocking to block up untill the replyTimeout, and added a test for it, where manual Ack is never received ticket #1926 removed unnecesary methods --- akka-camel/src/main/resources/reference.conf | 3 +- .../main/scala/akka/camel/Activation.scala | 16 +- .../src/main/scala/akka/camel/Camel.scala | 27 ++- .../main/scala/akka/camel/CamelMessage.scala | 163 ++++++--------- .../src/main/scala/akka/camel/Consumer.scala | 52 +++-- .../src/main/scala/akka/camel/Producer.scala | 113 ++++++---- .../camel/internal/ActivationMessage.scala | 101 +++++---- .../camel/internal/ActivationTracker.scala | 6 +- .../camel/internal/CamelExchangeAdapter.scala | 4 +- .../akka/camel/internal/CamelSupervisor.scala | 197 ++++++++++++++++++ .../internal/ConsumerActorRouteBuilder.scala | 39 ++++ .../camel/internal/ConsumerRegistry.scala | 151 -------------- .../akka/camel/internal/DefaultCamel.scala | 57 ++++- .../camel/internal/ProducerRegistry.scala | 86 -------- .../internal/component/ActorComponent.scala | 8 +- .../camel/javaapi/UntypedConsumerActor.scala | 7 +- .../camel/javaapi/UntypedProducerActor.scala | 3 +- akka-camel/src/main/scala/akka/package.scala | 2 +- .../java/akka/camel/ConsumerJavaTestBase.java | 72 +++---- .../java/akka/camel/CustomRouteTestBase.java | 28 ++- .../java/akka/camel/MessageJavaTestBase.java | 28 +-- .../camel/SampleErrorHandlingConsumer.java | 16 +- .../java/akka/camel/SampleUntypedActor.java | 16 -- .../src/test/resources/logback-test.xml | 23 ++ .../camel/ActivationIntegrationTest.scala | 38 ++-- .../scala/akka/camel/CamelMessageTest.scala | 15 +- .../akka/camel/ConcurrentActivationTest.scala | 159 ++++++++++++++ .../akka/camel/ConsumerIntegrationTest.scala | 110 ++++++---- .../scala/akka/camel/DefaultCamelTest.scala | 5 +- .../scala/akka/camel/MessageScalaTest.scala | 27 +-- .../akka/camel/ProducerFeatureTest.scala | 69 +++--- .../akka/camel/ProducerRegistryTest.scala | 53 ----- .../test/scala/akka/camel/TestSupport.scala | 9 +- .../akka/camel/UntypedProducerTest.scala | 6 +- .../internal/ActivationTrackerTest.scala | 4 +- .../component/ActorProducerTest.scala | 69 +++++- .../component/DurationConverterTest.scala | 3 +- .../code/docs/camel/ActivationTestBase.java | 7 +- .../rst/java/code/docs/camel/Consumer4.java | 2 +- .../docs/camel/ErrorThrowingConsumer.java | 13 +- .../rst/java/code/docs/camel/Responder.java | 4 +- .../rst/java/code/docs/camel/Transformer.java | 4 +- .../camel/sample/http/HttpTransformer.java | 4 +- .../docs/camel/sample/route/Transformer.java | 4 +- .../scala/code/docs/camel/CustomRoute.scala | 5 +- .../scala/code/docs/camel/HttpExample.scala | 2 +- project/AkkaBuild.scala | 10 +- 47 files changed, 1056 insertions(+), 784 deletions(-) create mode 100644 akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala create mode 100644 akka-camel/src/main/scala/akka/camel/internal/ConsumerActorRouteBuilder.scala delete mode 100644 akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala delete mode 100644 akka-camel/src/main/scala/akka/camel/internal/ProducerRegistry.scala delete mode 100644 akka-camel/src/test/java/akka/camel/SampleUntypedActor.java create mode 100644 akka-camel/src/test/resources/logback-test.xml create mode 100644 akka-camel/src/test/scala/akka/camel/ConcurrentActivationTest.scala delete mode 100644 akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala diff --git a/akka-camel/src/main/resources/reference.conf b/akka-camel/src/main/resources/reference.conf index 76be54088d..302a31b754 100644 --- a/akka-camel/src/main/resources/reference.conf +++ b/akka-camel/src/main/resources/reference.conf @@ -9,7 +9,8 @@ akka { camel { # Whether JMX should be enabled or disabled for the Camel Context jmx = off - + # enable/disable streaming cache on the Camel Context + streamingCache = on consumer { # Configured setting which determines whether one-way communications between an endpoint and this consumer actor # should be auto-acknowledged or application-acknowledged. diff --git a/akka-camel/src/main/scala/akka/camel/Activation.scala b/akka-camel/src/main/scala/akka/camel/Activation.scala index 4af600215c..a12abc7d0c 100644 --- a/akka-camel/src/main/scala/akka/camel/Activation.scala +++ b/akka-camel/src/main/scala/akka/camel/Activation.scala @@ -10,16 +10,12 @@ import akka.actor.{ ActorSystem, Props, ActorRef } import akka.pattern._ import scala.concurrent.util.Duration import concurrent.{ ExecutionContext, Future } -import scala.concurrent.util.FiniteDuration /** * Activation trait that can be used to wait on activation or de-activation of Camel endpoints. * The Camel endpoints are activated asynchronously. This trait can signal when an endpoint is activated or de-activated. */ trait Activation { - def system: ActorSystem //FIXME Why is this here, what's it needed for and who should use it? - - private val activationTracker = system.actorOf(Props[ActivationTracker], "camelActivationTracker") //FIXME Why is this also top level? /** * Produces a Future with the specified endpoint that will be completed when the endpoint has been activated, @@ -28,11 +24,7 @@ trait Activation { * @param endpoint the endpoint to be activated * @param timeout the timeout for the Future */ - def activationFutureFor(endpoint: ActorRef)(implicit timeout: FiniteDuration, executor: ExecutionContext): Future[ActorRef] = - (activationTracker.ask(AwaitActivation(endpoint))(Timeout(timeout))).map[ActorRef]({ - case EndpointActivated(`endpoint`) ⇒ endpoint - case EndpointFailedToActivate(`endpoint`, cause) ⇒ throw cause - }) + def activationFutureFor(endpoint: ActorRef)(implicit timeout: Timeout, executor: ExecutionContext): Future[ActorRef] /** * Produces a Future which will be completed when the given endpoint has been deactivated or @@ -41,9 +33,5 @@ trait Activation { * @param endpoint the endpoint to be deactivated * @param timeout the timeout of the Future */ - def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: FiniteDuration, executor: ExecutionContext): Future[ActorRef] = - (activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[ActorRef]({ - case EndpointDeActivated(`endpoint`) ⇒ endpoint - case EndpointFailedToDeActivate(`endpoint`, cause) ⇒ throw cause - }) + def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: Timeout, executor: ExecutionContext): Future[ActorRef] } \ No newline at end of file diff --git a/akka-camel/src/main/scala/akka/camel/Camel.scala b/akka-camel/src/main/scala/akka/camel/Camel.scala index 258c7ec2ad..43682c49c5 100644 --- a/akka-camel/src/main/scala/akka/camel/Camel.scala +++ b/akka-camel/src/main/scala/akka/camel/Camel.scala @@ -6,7 +6,8 @@ package akka.camel import internal._ import akka.actor._ -import org.apache.camel.{ ProducerTemplate, CamelContext } +import org.apache.camel.ProducerTemplate +import org.apache.camel.impl.DefaultCamelContext import com.typesafe.config.Config import scala.concurrent.util.Duration import java.util.concurrent.TimeUnit._ @@ -17,16 +18,16 @@ import scala.concurrent.util.FiniteDuration * '''Note:''' `CamelContext` and `ProducerTemplate` are stopped when the associated actor system is shut down. * This trait can be obtained through the [[akka.camel.CamelExtension]] object. */ -trait Camel extends ConsumerRegistry with ProducerRegistry with Extension with Activation { +trait Camel extends Extension with Activation { /** * Underlying camel context. * * It can be used to configure camel manually, i.e. when the user wants to add new routes or endpoints, * i.e. {{{camel.context.addRoutes(...)}}} * - * @see [[org.apache.camel.CamelContext]] + * @see [[org.apache.camel.impl.DefaultCamelContext]] */ - def context: CamelContext + def context: DefaultCamelContext /** * The Camel ProducerTemplate. @@ -38,6 +39,16 @@ trait Camel extends ConsumerRegistry with ProducerRegistry with Extension with A * The settings for the CamelExtension */ def settings: CamelSettings + + /** + * For internal use only. Returns the camel supervisor actor. + */ + private[camel] def supervisor: ActorRef + + /** + * For internal use only. Returns the associated ActorSystem. + */ + private[camel] def system: ActorSystem } /** @@ -68,6 +79,12 @@ class CamelSettings private[camel] (config: Config) { * */ final val jmxStatistics: Boolean = config.getBoolean("akka.camel.jmx") + + /** + * enables or disables streamingCache on the Camel Context + */ + final val streamingCache: Boolean = config.getBoolean("akka.camel.streamingCache") + } /** @@ -97,4 +114,4 @@ object CamelExtension extends ExtensionId[Camel] with ExtensionIdProvider { override def lookup(): ExtensionId[Camel] = CamelExtension override def get(system: ActorSystem): Camel = super.get(system) -} \ No newline at end of file +} diff --git a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala index 6c45a66bf4..3e0f0734b0 100644 --- a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala +++ b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala @@ -8,14 +8,14 @@ import java.util.{ Map ⇒ JMap, Set ⇒ JSet } import scala.collection.JavaConversions._ -import akka.japi.{ Function ⇒ JFunction } import org.apache.camel.{ CamelContext, Message ⇒ JCamelMessage } import akka.AkkaException import scala.reflect.ClassTag +import akka.dispatch.Mapper +import util.{ Success, Failure, Try } /** * An immutable representation of a Camel message. - * * @author Martin Krasser */ case class CamelMessage(body: Any, headers: Map[String, Any]) { @@ -47,86 +47,59 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) { */ def getHeaders: JMap[String, Any] = headers - /** - * Returns the header with given name. Throws NoSuchElementException - * if the header doesn't exist. - */ - def header(name: String): Option[Any] = headers.get(name) - - /** - * Returns the header with given name. Throws NoSuchElementException - * if the header doesn't exist. - *

- * Java API - */ - def getHeader(name: String): Any = headers(name) - - /** - * Creates a CamelMessage with a transformed body using a transformer function. - */ - def mapBody[A, B](transformer: A ⇒ B): CamelMessage = withBody(transformer(body.asInstanceOf[A])) - - /** - * Creates a CamelMessage with a transformed body using a transformer function. - *

- * Java API - */ - def mapBody[A, B](transformer: JFunction[A, B]): CamelMessage = withBody(transformer(body.asInstanceOf[A])) - - /** - * Creates a CamelMessage with a given body. - */ - def withBody(body: Any): CamelMessage = CamelMessage(body, this.headers) - - /** - * Creates a new CamelMessage with given headers. - */ - def withHeaders[A](headers: Map[String, A]): CamelMessage = copy(this.body, headers) - /** * Creates a new CamelMessage with given headers. A copy of the headers map is made. *

* Java API */ - def withHeaders[A](headers: JMap[String, A]): CamelMessage = withHeaders(headers.toMap) + def withHeaders[A](headers: JMap[String, A]): CamelMessage = copy(this.body, headers.toMap) /** - * Creates a new CamelMessage with given headers added to the current headers. - */ - def addHeaders[A](headers: Map[String, A]): CamelMessage = copy(this.body, this.headers ++ headers) - - /** - * Creates a new CamelMessage with given headers added to the current headers. - * A copy of the headers map is made. + * Returns the header by given name parameter in a [[scala.util.Try]]. The header is converted to type T, which is returned + * in a [[scala.util.Success]]. If an exception occurs during the conversion to the type T or when the header cannot be found, + * the exception is returned in a [[scala.util.Failure]]. + * *

- * Java API + * The CamelContext is accessible in a [[akka.camel.javaapi.UntypedConsumerActor]] and [[akka.camel.javaapi.UntypedProducerActor]] + * using the `getCamelContext` method, and is available on the [[akka.camel.CamelExtension]]. + * */ - def addHeaders[A](headers: JMap[String, A]): CamelMessage = addHeaders(headers.toMap) - - /** - * Creates a new CamelMessage with the given header added to the current headers. - */ - def addHeader(header: (String, Any)): CamelMessage = copy(this.body, this.headers + header) - - /** - * Creates a new CamelMessage with the given header, represented by name and - * value added to the existing headers. - *

- * Java API - */ - def addHeader(name: String, value: Any): CamelMessage = addHeader((name, value)) - - /** - * Creates a new CamelMessage where the header with given headerName is removed from - * the existing headers. - */ - def withoutHeader(headerName: String): CamelMessage = copy(this.body, this.headers - headerName) - - def copyContentTo(to: JCamelMessage): Unit = { - to.setBody(this.body) - for ((name, value) ← this.headers) to.getHeaders.put(name, value.asInstanceOf[AnyRef]) + def headerAs[T](name: String)(implicit t: ClassTag[T], camelContext: CamelContext): Try[T] = { + def tryHeader = headers.get(name).map(camelContext.getTypeConverter.mandatoryConvertTo[T](t.runtimeClass.asInstanceOf[Class[T]], _)) match { + case Some(header) ⇒ header + case None ⇒ throw new NoSuchElementException(name) + } + Try(tryHeader) } + /** + * Returns the header by given name parameter. The header is converted to type T as defined by the clazz parameter. + * An exception is thrown when the conversion to the type T fails or when the header cannot be found. + *

+ * The CamelContext is accessible in a [[akka.camel.javaapi.UntypedConsumerActor]] and [[akka.camel.javaapi.UntypedProducerActor]] + * using the `getCamelContext` method, and is available on the [[akka.camel.CamelExtension]]. + *

+ * Java API + */ + def getHeaderAs[T](name: String, clazz: Class[T], camelContext: CamelContext): T = headerAs[T](name)(ClassTag(clazz), camelContext) match { + case Success(header) ⇒ header + case Failure(t) ⇒ throw t + } + + /** + * Returns a new CamelMessage with a transformed body using a transformer function. + * This method will throw a [[java.lang.ClassCastException]] if the body cannot be mapped to type A. + */ + def mapBody[A, B](transformer: A ⇒ B): CamelMessage = copy(body = transformer(body.asInstanceOf[A])) + + /** + * Returns a new CamelMessage with a transformed body using a transformer function. + * This method will throw a [[java.lang.ClassCastException]] if the body cannot be mapped to type A. + *

+ * Java API + */ + def mapBody[A, B](transformer: Mapper[A, B]): CamelMessage = copy(body = transformer(body.asInstanceOf[A])) + /** * Returns the body of the message converted to the type T. Conversion is done * using Camel's type converter. The type converter is obtained from the CamelContext that is passed in. @@ -148,6 +121,12 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) { */ def getBodyAs[T](clazz: Class[T], camelContext: CamelContext): T = camelContext.getTypeConverter.mandatoryConvertTo[T](clazz, body) + /** + * Returns a new CamelMessage with a new body, while keeping the same headers. + *

+ * Java API + */ + def withBody[T](body: T) = this.copy(body = body) /** * Creates a CamelMessage with current body converted to type T. * The CamelContext is accessible in a [[akka.camel.javaapi.UntypedConsumerActor]] and [[akka.camel.javaapi.UntypedProducerActor]] @@ -163,28 +142,7 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) { *

* Java API */ - def withBodyAs[T](clazz: Class[T])(implicit camelContext: CamelContext): CamelMessage = withBody(getBodyAs(clazz, camelContext)) - - /** - * Returns the header with given name converted to type T. Throws - * NoSuchElementException if the header doesn't exist. - *

- * The CamelContext is accessible in a [[akka.camel.javaapi.UntypedConsumerActor]] and [[akka.camel.javaapi.UntypedProducerActor]] - * using the `getCamelContext` method, and is available on the [[akka.camel.CamelExtension]]. - * - */ - def headerAs[T](name: String)(implicit t: ClassTag[T], camelContext: CamelContext): Option[T] = header(name).map(camelContext.getTypeConverter.mandatoryConvertTo[T](t.runtimeClass.asInstanceOf[Class[T]], _)) - - /** - * Returns the header with given name converted to type as given by the clazz - * parameter. Throws NoSuchElementException if the header doesn't exist. - *

- * The CamelContext is accessible in a [[akka.camel.javaapi.UntypedConsumerActor]] and [[akka.camel.javaapi.UntypedProducerActor]] - * using the `getCamelContext` method, and is available on the [[akka.camel.CamelExtension]]. - *

- * Java API - */ - def getHeaderAs[T](name: String, clazz: Class[T], camelContext: CamelContext): T = headerAs[T](name)(ClassTag(clazz), camelContext).get + def withBodyAs[T](clazz: Class[T])(implicit camelContext: CamelContext): CamelMessage = copy(body = getBodyAs(clazz, camelContext)) } @@ -208,24 +166,28 @@ object CamelMessage { * CamelMessage then msg is returned, otherwise msg is set as body of a * newly created CamelMessage object. */ - def canonicalize(msg: Any) = msg match { + private[camel] def canonicalize(msg: Any) = msg match { case mobj: CamelMessage ⇒ mobj case body ⇒ CamelMessage(body, Map.empty) } - /** - * Creates a new CamelMessage object from the Camel message. - */ - def from(camelMessage: JCamelMessage): CamelMessage = from(camelMessage, Map.empty) - /** * Creates a new CamelMessage object from the Camel message. * * @param headers additional headers to set on the created CamelMessage in addition to those * in the Camel message. */ - def from(camelMessage: JCamelMessage, headers: Map[String, Any]): CamelMessage = CamelMessage(camelMessage.getBody, headers ++ camelMessage.getHeaders) + private[camel] def from(camelMessage: JCamelMessage, headers: Map[String, Any]): CamelMessage = CamelMessage(camelMessage.getBody, headers ++ camelMessage.getHeaders) + /** + * For internal use only. + * copies the content of this CamelMessage to an Apache Camel Message. + * + */ + private[camel] def copyContent(from: CamelMessage, to: JCamelMessage): Unit = { + to.setBody(from.body) + for ((name, value) ← from.headers) to.getHeaders.put(name, value.asInstanceOf[AnyRef]) + } } /** @@ -242,7 +204,6 @@ case object Ack { * An exception indicating that the exchange to the camel endpoint failed. * It contains the failure cause obtained from Exchange.getException and the headers from either the Exchange.getIn * message or Exchange.getOut message, depending on the exchange pattern. - * */ class AkkaCamelException private[akka] (cause: Throwable, val headers: Map[String, Any]) extends AkkaException(cause.getMessage, cause) { diff --git a/akka-camel/src/main/scala/akka/camel/Consumer.scala b/akka-camel/src/main/scala/akka/camel/Consumer.scala index 89f7719e23..61ba9a8cc4 100644 --- a/akka-camel/src/main/scala/akka/camel/Consumer.scala +++ b/akka-camel/src/main/scala/akka/camel/Consumer.scala @@ -4,20 +4,19 @@ package akka.camel -import language.postfixOps -import internal.component.DurationTypeConverter +import akka.camel.internal.CamelSupervisor.Register import org.apache.camel.model.{ RouteDefinition, ProcessorDefinition } import akka.actor._ -import scala.concurrent.util.Duration -import scala.concurrent.util.duration._ import scala.concurrent.util.FiniteDuration +import akka.dispatch.Mapper /** * Mixed in by Actor implementations that consume message from Camel endpoints. * * @author Martin Krasser */ -trait Consumer extends Actor with CamelSupport with ConsumerConfig { +trait Consumer extends Actor with CamelSupport { + import Consumer._ /** * Must return the Camel endpoint URI that the consumer wants to consume messages from. */ @@ -33,11 +32,13 @@ trait Consumer extends Actor with CamelSupport with ConsumerConfig { // with order of execution of trait body in the Java version (UntypedConsumerActor) // where getEndpointUri is called before its constructor (where a uri is set to return from getEndpointUri) // and remains null. CustomRouteTest provides a test to verify this. - camel.registerConsumer(endpointUri, this, activationTimeout) + register() + } + + private[this] def register() { + camel.supervisor ! Register(self, endpointUri, Some(ConsumerConfig(activationTimeout, replyTimeout, autoAck, onRouteDefinition))) } -} -trait ConsumerConfig { this: CamelSupport ⇒ /** * How long the actor should wait for activation before it fails. */ @@ -48,7 +49,7 @@ trait ConsumerConfig { this: CamelSupport ⇒ * the endpoint can take to send the response before the message exchange fails. It defaults to 1 minute. * This setting is used for out-capable, in-only, manually acknowledged communication. */ - def replyTimeout: Duration = camel.settings.replyTimeout + def replyTimeout: FiniteDuration = camel.settings.replyTimeout /** * Determines whether one-way communications between an endpoint and this consumer actor @@ -58,9 +59,36 @@ trait ConsumerConfig { this: CamelSupport ⇒ def autoAck: Boolean = camel.settings.autoAck /** - * The route definition handler for creating a custom route to this consumer instance. + * Returns the route definition handler for creating a custom route to this consumer. + * By default it returns an identity function, override this method to + * return a custom route definition handler. The returned function is not allowed to close over 'this', meaning it is + * not allowed to refer to the actor instance itself, since that can easily cause concurrent shared state issues. */ - //FIXME: write a test confirming onRouteDefinition gets called - def onRouteDefinition(rd: RouteDefinition): ProcessorDefinition[_] = rd + def onRouteDefinition: RouteDefinition ⇒ ProcessorDefinition[_] = { + val mapper = getRouteDefinitionHandler + if (mapper != identityRouteMapper) mapper.apply _ + else identityRouteMapper + } + /** + * Java API. Returns the [[akka.dispatch.Mapper]] function that will be used as a route definition handler + * for creating custom route to this consumer. By default it returns an identity function, override this method to + * return a custom route definition handler. The [[akka.dispatch.Mapper]] is not allowed to close over 'this', meaning it is + * not allowed to refer to the actor instance itself, since that can easily cause concurrent shared state issues. + */ + def getRouteDefinitionHandler: Mapper[RouteDefinition, ProcessorDefinition[_]] = identityRouteMapper } + +/** + * Internal use only. + */ +private[camel] object Consumer { + val identityRouteMapper = new Mapper[RouteDefinition, ProcessorDefinition[_]]() { + override def checkedApply(rd: RouteDefinition): ProcessorDefinition[_] = rd + } +} +/** + * For internal use only. + * Captures the configuration of the Consumer. + */ +private[camel] case class ConsumerConfig(activationTimeout: FiniteDuration, replyTimeout: FiniteDuration, autoAck: Boolean, onRouteDefinition: RouteDefinition ⇒ ProcessorDefinition[_]) extends NoSerializationVerificationNeeded diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala index c6e8c6714d..683ff4f20f 100644 --- a/akka-camel/src/main/scala/akka/camel/Producer.scala +++ b/akka-camel/src/main/scala/akka/camel/Producer.scala @@ -4,10 +4,11 @@ package akka.camel -import akka.actor.Actor +import akka.actor.{ Props, NoSerializationVerificationNeeded, ActorRef, Actor } +import internal.CamelSupervisor.{ CamelProducerObjects, Register } import internal.CamelExchangeAdapter import akka.actor.Status.Failure -import org.apache.camel.{ Endpoint, Exchange, ExchangePattern, AsyncCallback } +import org.apache.camel.{ Endpoint, ExchangePattern, AsyncCallback } import org.apache.camel.processor.SendProcessor /** @@ -15,9 +16,16 @@ import org.apache.camel.processor.SendProcessor * * @author Martin Krasser */ -trait ProducerSupport extends CamelSupport { this: Actor ⇒ +trait ProducerSupport extends Actor with CamelSupport { + private[this] var messages = Map[ActorRef, Any]() + private[this] var producerChild: Option[ActorRef] = None - protected[this] lazy val (endpoint: Endpoint, processor: SendProcessor) = camel.registerProducer(self, endpointUri) + override def preStart() { + super.preStart() + register() + } + + private[this] def register() { camel.supervisor ! Register(self, endpointUri) } /** * CamelMessage headers to copy by default from request message to response-message. @@ -43,40 +51,6 @@ trait ProducerSupport extends CamelSupport { this: Actor ⇒ */ def headersToCopy: Set[String] = headersToCopyDefault - /** - * Initiates a message exchange of given pattern with the endpoint specified by - * endpointUri. The in-message of the initiated exchange is the canonical form - * of msg. After sending the in-message, the processing result (response) is passed - * as argument to receiveAfterProduce. If the response is received synchronously from - * the endpoint then receiveAfterProduce is called synchronously as well. If the - * response is received asynchronously, the receiveAfterProduce is called - * asynchronously. The original - * sender and senderFuture are preserved. - * - * @see CamelMessage#canonicalize(Any) - * - * @param msg message to produce - * @param pattern exchange pattern - */ - protected def produce(msg: Any, pattern: ExchangePattern): Unit = { - // Need copies of sender reference here since the callback could be done - // later by another thread. - val producer = self - val originalSender = sender - - val cmsg = CamelMessage.canonicalize(msg) - val xchg = new CamelExchangeAdapter(endpoint.createExchange(pattern)) - - xchg.setRequest(cmsg) - - processor.process(xchg.exchange, new AsyncCallback { - // Ignoring doneSync, sending back async uniformly. - def done(doneSync: Boolean): Unit = producer.tell( - if (xchg.exchange.isFailed) xchg.toFailureResult(cmsg.headers(headersToCopy)) - else MessageResult(xchg.toResponseMessage(cmsg.headers(headersToCopy))), originalSender) - }) - } - /** * Produces msg to the endpoint specified by endpointUri. Before the message is * actually sent it is pre-processed by calling receiveBeforeProduce. If oneway @@ -85,12 +59,28 @@ trait ProducerSupport extends CamelSupport { this: Actor ⇒ * @see Producer#produce(Any, ExchangePattern) */ protected def produce: Receive = { + case CamelProducerObjects(endpoint, processor) ⇒ + if (producerChild.isEmpty) { + producerChild = Some(context.actorOf(Props(new ProducerChild(endpoint, processor)))) + messages = { + for ( + child ← producerChild; + (sender, msg) ← messages + ) child.tell(msg, sender) + Map() + } + } case res: MessageResult ⇒ routeResponse(res.message) case res: FailureResult ⇒ val e = new AkkaCamelException(res.cause, res.headers) routeResponse(Failure(e)) throw e - case msg ⇒ produce(transformOutgoingMessage(msg), if (oneway) ExchangePattern.InOnly else ExchangePattern.InOut) + + case msg ⇒ + producerChild match { + case Some(child) ⇒ child forward msg + case None ⇒ messages += (sender -> msg) + } } /** @@ -117,8 +107,45 @@ trait ProducerSupport extends CamelSupport { this: Actor ⇒ protected def routeResponse(msg: Any): Unit = if (!oneway) sender ! transformResponse(msg) -} + private class ProducerChild(endpoint: Endpoint, processor: SendProcessor) extends Actor { + def receive = { + case msg @ (_: FailureResult | _: MessageResult) ⇒ context.parent forward msg + case msg ⇒ produce(endpoint, processor, transformOutgoingMessage(msg), if (oneway) ExchangePattern.InOnly else ExchangePattern.InOut) + } + /** + * Initiates a message exchange of given pattern with the endpoint specified by + * endpointUri. The in-message of the initiated exchange is the canonical form + * of msg. After sending the in-message, the processing result (response) is passed + * as argument to receiveAfterProduce. If the response is received synchronously from + * the endpoint then receiveAfterProduce is called synchronously as well. If the + * response is received asynchronously, the receiveAfterProduce is called + * asynchronously. The original + * sender and senderFuture are preserved. + * + * @see CamelMessage#canonicalize(Any) + * @param endpoint the endpoint + * @param processor the processor + * @param msg message to produce + * @param pattern exchange pattern + */ + protected def produce(endpoint: Endpoint, processor: SendProcessor, msg: Any, pattern: ExchangePattern): Unit = { + // Need copies of sender reference here since the callback could be done + // later by another thread. + val producer = self + val originalSender = sender + val xchg = new CamelExchangeAdapter(endpoint.createExchange(pattern)) + val cmsg = CamelMessage.canonicalize(msg) + xchg.setRequest(cmsg) + processor.process(xchg.exchange, new AsyncCallback { + // Ignoring doneSync, sending back async uniformly. + def done(doneSync: Boolean): Unit = producer.tell( + if (xchg.exchange.isFailed) xchg.toFailureResult(cmsg.headers(headersToCopy)) + else MessageResult(xchg.toResponseMessage(cmsg.headers(headersToCopy))), originalSender) + }) + } + } +} /** * Mixed in by Actor implementations to produce messages to Camel endpoints. */ @@ -132,14 +159,16 @@ trait Producer extends ProducerSupport { this: Actor ⇒ } /** + * For internal use only. * @author Martin Krasser */ -private case class MessageResult(message: CamelMessage) +private case class MessageResult(message: CamelMessage) extends NoSerializationVerificationNeeded /** + * For internal use only. * @author Martin Krasser */ -private case class FailureResult(cause: Throwable, headers: Map[String, Any] = Map.empty) +private case class FailureResult(cause: Throwable, headers: Map[String, Any] = Map.empty) extends NoSerializationVerificationNeeded /** * A one-way producer. diff --git a/akka-camel/src/main/scala/akka/camel/internal/ActivationMessage.scala b/akka-camel/src/main/scala/akka/camel/internal/ActivationMessage.scala index bdd915ff70..7b27dbc789 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/ActivationMessage.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/ActivationMessage.scala @@ -6,53 +6,64 @@ package akka.camel.internal import akka.actor.ActorRef -/** - * Super class of all activation messages. Registration of the Camel [[akka.camel.Consumer]]s and [[akka.camel.Producer]]s - * is done asynchronously. Activation messages are sent in the Camel extension when endpoints are - * activated, de-activated, failed to activate and failed to de-activate. - * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]] - * to await activation or de-activation of endpoints. - */ -private[camel] abstract class ActivationMessage(val actor: ActorRef) +private[camel] object ActivationProtocol { + /** + * Super class of all activation messages. Registration of the Camel [[akka.camel.Consumer]]s and [[akka.camel.Producer]]s + * is done asynchronously. Activation messages are sent in the Camel extension when endpoints are + * activated, de-activated, failed to activate and failed to de-activate. + * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]] + * to await activation or de-activation of endpoints. + */ + @SerialVersionUID(1L) + private[camel] abstract class ActivationMessage(val actor: ActorRef) extends Serializable -/** - * For internal use only. companion object of ActivationMessage - * - */ -private[camel] object ActivationMessage { - def unapply(msg: ActivationMessage): Option[ActorRef] = Option(msg.actor) -} + /** + * For internal use only. companion object of ActivationMessage + * + */ + private[camel] object ActivationMessage { + def unapply(msg: ActivationMessage): Option[ActorRef] = Option(msg.actor) + } -/** - * Event message indicating that a single endpoint has been activated. - * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]] - * to await activation or de-activation of endpoints. - * @param actorRef the endpoint that was activated - */ -private[camel] sealed case class EndpointActivated(actorRef: ActorRef) extends ActivationMessage(actorRef) + /** + * For internal use only. + * Event message indicating that a single endpoint has been activated. + * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]] + * to await activation or de-activation of endpoints. + * @param actorRef the endpoint that was activated + */ + @SerialVersionUID(1L) + final case class EndpointActivated(actorRef: ActorRef) extends ActivationMessage(actorRef) -/** - * Event message indicating that a single endpoint failed to activate. - * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]] - * to await activation or de-activation of endpoints. - * @param actorRef the endpoint that failed to activate - * @param cause the cause for failure - */ -private[camel] sealed case class EndpointFailedToActivate(actorRef: ActorRef, cause: Throwable) extends ActivationMessage(actorRef) + /** + * For internal use only. + * Event message indicating that a single endpoint failed to activate. + * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]] + * to await activation or de-activation of endpoints. + * @param actorRef the endpoint that failed to activate + * @param cause the cause for failure + */ + @SerialVersionUID(1L) + final case class EndpointFailedToActivate(actorRef: ActorRef, cause: Throwable) extends ActivationMessage(actorRef) -/** - * Event message indicating that a single endpoint was de-activated. - * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]] - * to await activation or de-activation of endpoints. - * @param actorRef the endpoint that was de-activated - */ -private[camel] sealed case class EndpointDeActivated(actorRef: ActorRef) extends ActivationMessage(actorRef) + /** + * For internal use only. + * Event message indicating that a single endpoint was de-activated. + * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]] + * to await activation or de-activation of endpoints. + * @param actorRef the endpoint that was de-activated + */ + @SerialVersionUID(1L) + final case class EndpointDeActivated(actorRef: ActorRef) extends ActivationMessage(actorRef) -/** - * Event message indicating that a single endpoint failed to de-activate. - * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]] - * to await activation or de-activation of endpoints. - * @param actorRef the endpoint that failed to de-activate - * @param cause the cause for failure - */ -private[camel] sealed case class EndpointFailedToDeActivate(actorRef: ActorRef, cause: Throwable) extends ActivationMessage(actorRef) + /** + * For internal use only. + * Event message indicating that a single endpoint failed to de-activate. + * You can use the [[akka.camel.Activation]] trait which is available on [[akka.camel.Camel]] + * to await activation or de-activation of endpoints. + * @param actorRef the endpoint that failed to de-activate + * @param cause the cause for failure + */ + @SerialVersionUID(1L) + final case class EndpointFailedToDeActivate(actorRef: ActorRef, cause: Throwable) extends ActivationMessage(actorRef) +} \ No newline at end of file diff --git a/akka-camel/src/main/scala/akka/camel/internal/ActivationTracker.scala b/akka-camel/src/main/scala/akka/camel/internal/ActivationTracker.scala index f5a87eff25..43ca2701c6 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/ActivationTracker.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/ActivationTracker.scala @@ -7,6 +7,7 @@ package akka.camel.internal import akka.actor._ import collection.mutable.WeakHashMap import akka.camel._ +import internal.ActivationProtocol._ /** * For internal use only. An actor that tracks activation and de-activation of endpoints. @@ -93,11 +94,6 @@ private[akka] final class ActivationTracker extends Actor with ActorLogging { } - /** - * Subscribes self to messages of type ActivationMessage - */ - override def preStart(): Unit = context.system.eventStream.subscribe(self, classOf[ActivationMessage]) - override def receive = { case msg @ ActivationMessage(ref) ⇒ (activations.getOrElseUpdate(ref, new ActivationStateMachine).receive orElse logStateWarning(ref))(msg) diff --git a/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala b/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala index 5de9eb447d..5750856b37 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala @@ -30,14 +30,14 @@ private[camel] class CamelExchangeAdapter(val exchange: Exchange) { /** * Sets Exchange.getIn from the given CamelMessage object. */ - def setRequest(msg: CamelMessage): Unit = msg.copyContentTo(request) + def setRequest(msg: CamelMessage): Unit = CamelMessage.copyContent(msg, request) /** * Depending on the exchange pattern, sets Exchange.getIn or Exchange.getOut from the given * CamelMessage object. If the exchange is out-capable then the Exchange.getOut is set, otherwise * Exchange.getIn. */ - def setResponse(msg: CamelMessage): Unit = msg.copyContentTo(response) + def setResponse(msg: CamelMessage): Unit = CamelMessage.copyContent(msg, response) /** * Sets Exchange.getException from the given FailureResult message. Headers of the FailureResult message diff --git a/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala b/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala new file mode 100644 index 0000000000..1a7fff6f92 --- /dev/null +++ b/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala @@ -0,0 +1,197 @@ +package akka.camel.internal + +import akka.actor._ +import akka.camel.{ CamelSupport, ConsumerConfig } +import org.apache.camel.Endpoint +import org.apache.camel.processor.SendProcessor +import scala.util.control.NonFatal +import akka.actor.Terminated +import akka.actor.SupervisorStrategy.Resume +import akka.camel.internal.CamelSupervisor._ +import akka.AkkaException +import akka.camel.internal.ActivationProtocol._ + +/** + * For internal use only. + * Top level supervisor for internal Camel actors + */ +private[camel] class CamelSupervisor extends Actor with CamelSupport { + private val activationTracker = context.actorOf(Props[ActivationTracker], "activationTracker") + private val registry: ActorRef = context.actorOf(Props(new Registry(activationTracker)), "registry") + + override val supervisorStrategy = OneForOneStrategy() { + case NonFatal(e) ⇒ + Resume + } + + def receive = { + case AddWatch(actorRef) ⇒ context.watch(actorRef) + case Terminated(actorRef) ⇒ registry ! DeRegister(actorRef) + case msg: ActivationMessage ⇒ activationTracker forward msg + case msg ⇒ registry forward (msg) + } +} + +/** + * For internal use only. + * Messages for the camel supervisor, registrations and de-registrations. + */ +private[camel] object CamelSupervisor { + + @SerialVersionUID(1L) + sealed trait CamelSupervisorMessage extends Serializable + + /** + * For internal use only. + * Registers a consumer or a producer. + */ + case class Register(actorRef: ActorRef, endpointUri: String, config: Option[ConsumerConfig] = None) extends NoSerializationVerificationNeeded + + /** + * For internal use only. + * De-registers a producer or a consumer. + */ + @SerialVersionUID(1L) + case class DeRegister(actorRef: ActorRef) extends CamelSupervisorMessage + + /** + * For internal use only. + * Adds a watch for the actor + */ + @SerialVersionUID(1L) + case class AddWatch(actorRef: ActorRef) extends CamelSupervisorMessage + + /** + * For internal use only. + * Provides a Producer with the required camel objects to function. + */ + case class CamelProducerObjects(endpoint: Endpoint, processor: SendProcessor) extends NoSerializationVerificationNeeded +} + +/** + * For internal use only. + * Thrown by registrars to indicate that the actor could not be de-activated. + */ +private[camel] class ActorDeActivationException(val actorRef: ActorRef, cause: Throwable) extends AkkaException("Actor [%s] failed to de-activate".format(actorRef), cause) + +/** + * For internal use only. + * Thrown by the registrars to indicate that the actor could not be activated. + */ +private[camel] class ActorActivationException(val actorRef: ActorRef, cause: Throwable) extends AkkaException("Actor [%s] failed to activate".format(actorRef), cause) + +/** + * For internal use only. + * Registry for Camel Consumers and Producers. Supervises the registrars. + */ +private[camel] class Registry(activationTracker: ActorRef) extends Actor with CamelSupport { + import context.{ stop, parent } + + private val producerRegistrar = context.actorOf(Props(new ProducerRegistrar(activationTracker)), "producerRegistrar") + private val consumerRegistrar = context.actorOf(Props(new ConsumerRegistrar(activationTracker)), "consumerRegistrar") + private var producers = Set[ActorRef]() + private var consumers = Set[ActorRef]() + + override val supervisorStrategy = OneForOneStrategy() { + case e: ActorActivationException ⇒ + activationTracker ! EndpointFailedToActivate(e.actorRef, e.getCause) + stop(e.actorRef) + Resume + case e: ActorDeActivationException ⇒ + activationTracker ! EndpointFailedToDeActivate(e.actorRef, e.getCause) + stop(e.actorRef) + Resume + case NonFatal(e) ⇒ + Resume + } + + def receive = { + case msg @ Register(consumer, _, Some(_)) ⇒ + if (!consumers(consumer)) { + consumers += consumer + consumerRegistrar forward msg + parent ! AddWatch(consumer) + } + case msg @ Register(producer, _, None) ⇒ + if (!producers(producer)) { + producers += producer + producerRegistrar forward msg + parent ! AddWatch(producer) + } + case DeRegister(actorRef) ⇒ + producers.find(_ == actorRef).foreach { p ⇒ + deRegisterProducer(p) + producers -= p + } + consumers.find(_ == actorRef).foreach { c ⇒ + deRegisterConsumer(c) + consumers -= c + } + } + + private def deRegisterConsumer(actorRef: ActorRef) { consumerRegistrar ! DeRegister(actorRef) } + + private def deRegisterProducer(actorRef: ActorRef) { producerRegistrar ! DeRegister(actorRef) } +} + +/** + * For internal use only. + * Registers Producers. + */ +private[camel] class ProducerRegistrar(activationTracker: ActorRef) extends Actor with CamelSupport { + private var camelObjects = Map[ActorRef, (Endpoint, SendProcessor)]() + + def receive = { + case Register(producer, endpointUri, _) ⇒ + if (!camelObjects.contains(producer)) { + try { + val endpoint = camelContext.getEndpoint(endpointUri) + val processor = new SendProcessor(endpoint) + camelObjects += producer -> (endpoint, processor) + // if this throws, the supervisor stops the producer and de-registers it on termination + processor.start() + producer ! CamelProducerObjects(endpoint, processor) + activationTracker ! EndpointActivated(producer) + } catch { + case NonFatal(e) ⇒ throw new ActorActivationException(producer, e) + } + } + case DeRegister(producer) ⇒ + camelObjects.get(producer).foreach { + case (_, processor) ⇒ + try { + camelObjects.get(producer).foreach(_._2.stop()) + camelObjects -= producer + activationTracker ! EndpointDeActivated(producer) + } catch { + case NonFatal(e) ⇒ throw new ActorDeActivationException(producer, e) + } + } + } +} + +/** + * For internal use only. + * Registers Consumers. + */ +private[camel] class ConsumerRegistrar(activationTracker: ActorRef) extends Actor with CamelSupport { + def receive = { + case Register(consumer, endpointUri, Some(consumerConfig)) ⇒ + try { + // if this throws, the supervisor stops the consumer and de-registers it on termination + camelContext.addRoutes(new ConsumerActorRouteBuilder(endpointUri, consumer, consumerConfig)) + activationTracker ! EndpointActivated(consumer) + } catch { + case NonFatal(e) ⇒ throw new ActorActivationException(consumer, e) + } + case DeRegister(consumer) ⇒ + try { + val route = consumer.path.toString + camelContext.stopRoute(route) + camelContext.removeRoute(route) + activationTracker ! EndpointDeActivated(consumer) + } catch { + case NonFatal(e) ⇒ throw new ActorDeActivationException(consumer, e) + } + } +} \ No newline at end of file diff --git a/akka-camel/src/main/scala/akka/camel/internal/ConsumerActorRouteBuilder.scala b/akka-camel/src/main/scala/akka/camel/internal/ConsumerActorRouteBuilder.scala new file mode 100644 index 0000000000..cd8b70bed5 --- /dev/null +++ b/akka-camel/src/main/scala/akka/camel/internal/ConsumerActorRouteBuilder.scala @@ -0,0 +1,39 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.camel.internal + +import akka.camel._ +import component.CamelPath +import java.io.InputStream + +import org.apache.camel.builder.RouteBuilder + +import akka.actor._ +import org.apache.camel.model.RouteDefinition +import akka.serialization.Serializer + +/** + * For internal use only. + * Builder of a route to a target which can be an actor. + * + * @param endpointUri endpoint URI of the consumer actor. + * + * @author Martin Krasser + */ +private[camel] class ConsumerActorRouteBuilder(endpointUri: String, consumer: ActorRef, config: ConsumerConfig) extends RouteBuilder { + + protected def targetActorUri = CamelPath.toUri(consumer, config.autoAck, config.replyTimeout) + + def configure() { + val scheme = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..." + val route = from(endpointUri).routeId(consumer.path.toString) + val converted = Conversions(scheme, route) + val userCustomized = applyUserRouteCustomization(converted) + userCustomized.to(targetActorUri) + } + + def applyUserRouteCustomization(rd: RouteDefinition) = config.onRouteDefinition(rd) + +} diff --git a/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala b/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala deleted file mode 100644 index 37fec6e1d0..0000000000 --- a/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.camel.internal - -import akka.camel._ -import component.CamelPath -import java.io.InputStream -import org.apache.camel.builder.RouteBuilder -import akka.actor._ -import collection.mutable -import org.apache.camel.model.RouteDefinition -import org.apache.camel.CamelContext -import scala.concurrent.util.Duration -import concurrent.Await -import akka.util.Timeout -import scala.concurrent.util.FiniteDuration - -/** - * For internal use only. - * Manages consumer registration. Consumers call registerConsumer method to register themselves when they get created. - * ActorEndpoint uses it to lookup an actor by its path. - */ -private[camel] trait ConsumerRegistry { this: Activation ⇒ - def system: ActorSystem - def context: CamelContext - - /** - * For internal use only. - */ - private[this] lazy val idempotentRegistry = system.actorOf(Props(new IdempotentCamelConsumerRegistry(context))) - /** - * For internal use only. BLOCKING - * @param endpointUri the URI to register the consumer on - * @param consumer the consumer - * @param activationTimeout the timeout for activation - * @return the actorRef to the consumer - */ - private[camel] def registerConsumer(endpointUri: String, consumer: Consumer, activationTimeout: FiniteDuration) = { - idempotentRegistry ! RegisterConsumer(endpointUri, consumer.self, consumer) - Await.result(activationFutureFor(consumer.self)(activationTimeout, consumer.context.dispatcher), activationTimeout) - } -} - -/** - * For internal use only. - * Guarantees idempotent registration of camel consumer endpoints. - * - * Once registered the consumer is watched and unregistered upon termination. - * It also publishes events to the eventStream so interested parties could subscribe to them. - * The main consumer of these events is currently the ActivationTracker. - */ -private[camel] class IdempotentCamelConsumerRegistry(camelContext: CamelContext) extends Actor { - - case class UnregisterConsumer(actorRef: ActorRef) - - val activated = new mutable.HashSet[ActorRef] - - val registrator = context.actorOf(Props(new CamelConsumerRegistrator)) - - def receive = { - case msg @ RegisterConsumer(_, consumer, _) ⇒ - if (!isAlreadyActivated(consumer)) { - activated.add(consumer) - registrator ! msg - } - case msg @ EndpointActivated(consumer) ⇒ - context.watch(consumer) - context.system.eventStream.publish(msg) - case msg @ EndpointFailedToActivate(consumer, _) ⇒ - activated.remove(consumer) - context.system.eventStream.publish(msg) - case Terminated(ref) ⇒ - activated.remove(ref) - registrator ! UnregisterConsumer(ref) - case msg @ EndpointFailedToDeActivate(ref, cause) ⇒ context.system.eventStream.publish(msg) - case msg: EndpointDeActivated ⇒ context.system.eventStream.publish(msg) - } - - def isAlreadyActivated(ref: ActorRef): Boolean = activated.contains(ref) - - //FIXME Break out - class CamelConsumerRegistrator extends Actor with ActorLogging { - - def receive = { - case RegisterConsumer(endpointUri, consumer, consumerConfig) ⇒ - camelContext.addRoutes(new ConsumerActorRouteBuilder(endpointUri, consumer, consumerConfig)) - context.sender ! EndpointActivated(consumer) - log.debug("Published actor [{}] at endpoint [{}]", consumerConfig, endpointUri) - case UnregisterConsumer(consumer) ⇒ - camelContext.stopRoute(consumer.path.toString) - camelContext.removeRoute(consumer.path.toString) - context.sender ! EndpointDeActivated(consumer) - log.debug("Unpublished actor [{}] from endpoint [{}]", consumer, consumer.path) - } - - override def preRestart(reason: Throwable, message: Option[Any]) { - //FIXME check logic - super.preStart() - message match { - case Some(RegisterConsumer(_, consumer, _)) ⇒ sender ! EndpointFailedToActivate(consumer, reason) - case Some(UnregisterConsumer(consumer)) ⇒ sender ! EndpointFailedToDeActivate(consumer, reason) - case _ ⇒ - } - } - } -} - -/** - * For internal use only. A message to register a consumer. - * @param endpointUri the endpointUri to register to - * @param actorRef the actorRef to register as a consumer - * @param config the configuration for the consumer - */ -private[camel] case class RegisterConsumer(endpointUri: String, actorRef: ActorRef, config: ConsumerConfig) - -/** - * For internal use only. - * Builder of a route to a target which can be an actor. - * - * @param endpointUri endpoint URI of the consumer actor. - * - * @author Martin Krasser - */ -private[camel] class ConsumerActorRouteBuilder(endpointUri: String, consumer: ActorRef, config: ConsumerConfig) extends RouteBuilder { - - protected def targetActorUri = CamelPath.toUri(consumer, config.autoAck, config.replyTimeout) - - def configure() { - val scheme = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..." - - val route = from(endpointUri).routeId(consumer.path.toString) - val converted = Conversions(scheme, route) - val userCustomized = applyUserRouteCustomization(converted) - userCustomized.to(targetActorUri) - } - - def applyUserRouteCustomization(rd: RouteDefinition) = config.onRouteDefinition(rd) - - object Conversions { - private val bodyConversions = Map( - "file" -> classOf[InputStream]) - - def apply(scheme: String, routeDefinition: RouteDefinition): RouteDefinition = bodyConversions.get(scheme) match { - case Some(clazz) ⇒ routeDefinition.convertBodyTo(clazz) - case None ⇒ routeDefinition - } - } - -} \ No newline at end of file diff --git a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala index 41aaacdf8c..016b923bf0 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala @@ -1,15 +1,21 @@ package akka.camel.internal -import akka.actor.ActorSystem +import akka.actor.{ ActorRef, Props, ActorSystem } import akka.camel.internal.component.{ DurationTypeConverter, ActorComponent } import org.apache.camel.impl.DefaultCamelContext import scala.Predef._ import akka.event.Logging import akka.camel.{ CamelSettings, Camel } +import akka.camel.internal.ActivationProtocol._ import scala.util.control.NonFatal import scala.concurrent.util.Duration -import org.apache.camel.{ ProducerTemplate, CamelContext } import scala.concurrent.util.FiniteDuration +import org.apache.camel.ProducerTemplate +import concurrent.{ Future, ExecutionContext } +import akka.util.Timeout +import akka.pattern.ask +import java.io.InputStream +import org.apache.camel.model.RouteDefinition /** * For internal use only. @@ -21,16 +27,17 @@ import scala.concurrent.util.FiniteDuration * Also by not creating extra internal actor system we are conserving resources. */ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel { + val supervisor = system.actorOf(Props[CamelSupervisor], "camel-supervisor") /** * For internal use only. */ private[camel] implicit val log = Logging(system, "Camel") - lazy val context: CamelContext = { + lazy val context: DefaultCamelContext = { val ctx = new DefaultCamelContext if (!settings.jmxStatistics) ctx.disableJMX() ctx.setName(system.name) - ctx.setStreamCaching(true) + ctx.setStreamCaching(settings.streamingCache) ctx.addComponent("akka", new ActorComponent(this, system)) ctx.getTypeConverterRegistry.addTypeConverter(classOf[FiniteDuration], classOf[String], DurationTypeConverter) ctx @@ -65,4 +72,46 @@ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel { } log.debug("Stopped CamelContext[{}] for ActorSystem[{}]", context.getName, system.name) } + + /** + * Produces a Future with the specified endpoint that will be completed when the endpoint has been activated, + * or if it times out, which will happen after the specified Timeout. + * + * @param endpoint the endpoint to be activated + * @param timeout the timeout for the Future + */ + def activationFutureFor(endpoint: ActorRef)(implicit timeout: Timeout, executor: ExecutionContext): Future[ActorRef] = + + (supervisor.ask(AwaitActivation(endpoint))(timeout)).map[ActorRef]({ + case EndpointActivated(`endpoint`) ⇒ endpoint + case EndpointFailedToActivate(`endpoint`, cause) ⇒ throw cause + }) + + /** + * Produces a Future which will be completed when the given endpoint has been deactivated or + * or if it times out, which will happen after the specified Timeout. + * + * @param endpoint the endpoint to be deactivated + * @param timeout the timeout of the Future + */ + def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: Timeout, executor: ExecutionContext): Future[ActorRef] = + (supervisor.ask(AwaitDeActivation(endpoint))(timeout)).map[ActorRef]({ + case EndpointDeActivated(`endpoint`) ⇒ endpoint + case EndpointFailedToDeActivate(`endpoint`, cause) ⇒ throw cause + }) } + +/** + * For internal use only. + */ +private[camel] object Conversions { + //FIXME Add this to the configuration, and move this functionality to the Camel Extension. + private val bodyConversions = Map( + "file" -> classOf[InputStream]) + + def apply(scheme: String, routeDefinition: RouteDefinition): RouteDefinition = bodyConversions.get(scheme) match { + case Some(clazz) ⇒ routeDefinition.convertBodyTo(clazz) + case None ⇒ routeDefinition + } +} + diff --git a/akka-camel/src/main/scala/akka/camel/internal/ProducerRegistry.scala b/akka-camel/src/main/scala/akka/camel/internal/ProducerRegistry.scala deleted file mode 100644 index 8631039a11..0000000000 --- a/akka-camel/src/main/scala/akka/camel/internal/ProducerRegistry.scala +++ /dev/null @@ -1,86 +0,0 @@ -package akka.camel.internal - -import java.util.concurrent.ConcurrentHashMap -import org.apache.camel.processor.SendProcessor -import akka.actor.{ Props, ActorRef, Terminated, Actor } -import org.apache.camel.Endpoint -import akka.camel._ -import scala.util.control.NonFatal - -/** - * Watches the end of life of Producers. - * Removes a Producer from the ProducerRegistry when it is Terminated, - * which in turn stops the SendProcessor. - * - * INTERNAL API - */ -private class ProducerWatcher(registry: ProducerRegistry) extends Actor { - override def receive = { - case RegisterProducer(actorRef) ⇒ context.watch(actorRef) - case Terminated(actorRef) ⇒ registry.unregisterProducer(actorRef) - } -} - -/** - * INTERNAL API - */ -private case class RegisterProducer(actorRef: ActorRef) - -/** - * For internal use only. - * Manages the Camel objects for Producers. - * Every Producer needs an Endpoint and a SendProcessor - * to produce messages over an Exchange. - */ -private[camel] trait ProducerRegistry { this: Camel ⇒ - private val camelObjects = new ConcurrentHashMap[ActorRef, (Endpoint, SendProcessor)]() - private val watcher = system.actorOf(Props(new ProducerWatcher(this))) //FIXME should this really be top level? - - private def registerWatch(actorRef: ActorRef): Unit = watcher ! RegisterProducer(actorRef) - - /** - * For internal use only. - * Unregisters Endpoint and SendProcessor and stops the SendProcessor - */ - private[camel] def unregisterProducer(actorRef: ActorRef): Unit = { - // Terminated cannot be sent before the actor is created in the processing of system messages. - Option(camelObjects.remove(actorRef)).foreach { - case (_, processor) ⇒ - try { - processor.stop() - system.eventStream.publish(EndpointDeActivated(actorRef)) - } catch { - case NonFatal(e) ⇒ system.eventStream.publish(EndpointFailedToDeActivate(actorRef, e)) - } - } - } - - /** - * For internal use only. - * Creates Endpoint and SendProcessor and associates the actorRef to these. - * @param actorRef the actorRef of the Producer actor. - * @param endpointUri the endpoint Uri of the producer - * @return Endpoint and SendProcessor registered for the actorRef - */ - private[camel] def registerProducer(actorRef: ActorRef, endpointUri: String): (Endpoint, SendProcessor) = { - try { - val endpoint = context.getEndpoint(endpointUri) - val processor = new SendProcessor(endpoint) - - camelObjects.putIfAbsent(actorRef, (endpoint, processor)) match { - case null ⇒ - processor.start() - registerWatch(actorRef) - system.eventStream.publish(EndpointActivated(actorRef)) - (endpoint, processor) - case prev ⇒ prev - } - } catch { - case NonFatal(e) ⇒ { - system.eventStream.publish(EndpointFailedToActivate(actorRef, e)) - // can't return null to the producer actor, so blow up actor in initialization. - throw e //FIXME I'm not a huge fan of log-rethrow, either log or rethrow - } - } - } -} \ No newline at end of file diff --git a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala index 61c4cdb1fb..2a9ab2155f 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala @@ -15,7 +15,7 @@ import scala.concurrent.util.duration._ import scala.concurrent.util.Duration import scala.concurrent.{ ExecutionContext, Future } import scala.util.control.NonFatal -import java.util.concurrent.{ TimeoutException, CountDownLatch } +import java.util.concurrent.{ TimeUnit, TimeoutException, CountDownLatch } import akka.util.Timeout import akka.camel.internal.CamelExchangeAdapter import akka.camel.{ ActorNotRegisteredException, Camel, Ack, FailureResult, CamelMessage } @@ -111,7 +111,7 @@ private[camel] trait ActorEndpointConfig { private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) extends DefaultProducer(endpoint) with AsyncProcessor { /** * Processes the exchange. - * Calls the asynchronous version of the method and waits for the result (blocking). + * Calls the synchronous version of the method and waits for the result (blocking). * @param exchange the exchange to process */ def process(exchange: Exchange): Unit = processExchangeAdapter(new CamelExchangeAdapter(exchange)) @@ -131,13 +131,11 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex /** * For internal use only. Processes the [[akka.camel.internal.CamelExchangeAdapter]] * @param exchange the [[akka.camel.internal.CamelExchangeAdapter]] - * - * WARNING UNBOUNDED BLOCKING AWAITS */ private[camel] def processExchangeAdapter(exchange: CamelExchangeAdapter): Unit = { val isDone = new CountDownLatch(1) processExchangeAdapter(exchange, new AsyncCallback { def done(doneSync: Boolean) { isDone.countDown() } }) - isDone.await() // this should never wait forever as the process(exchange, callback) method guarantees that. + isDone.await(camel.settings.replyTimeout.toMillis, TimeUnit.MILLISECONDS) } /** diff --git a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumerActor.scala b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumerActor.scala index 3cef3d285a..77526dab08 100644 --- a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumerActor.scala +++ b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumerActor.scala @@ -6,7 +6,8 @@ package akka.camel.javaapi import akka.actor.UntypedActor import akka.camel._ -import org.apache.camel.{ ProducerTemplate, CamelContext } +import org.apache.camel.ProducerTemplate +import org.apache.camel.impl.DefaultCamelContext /** * Subclass this abstract class to create an MDB-style untyped consumer actor. This @@ -21,10 +22,10 @@ abstract class UntypedConsumerActor extends UntypedActor with Consumer { def getEndpointUri(): String /** - * ''Java API'': Returns the [[org.apache.camel.CamelContext]] + * ''Java API'': Returns the [[org.apache.camel.impl.DefaultCamelContext]] * @return the CamelContext */ - protected def getCamelContext(): CamelContext = camelContext + protected def getCamelContext(): DefaultCamelContext = camelContext /** * ''Java API'': Returns the [[org.apache.camel.ProducerTemplate]] diff --git a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala index 38c4cf276a..cd353e04a0 100644 --- a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala +++ b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala @@ -7,6 +7,7 @@ package akka.camel.javaapi import akka.actor.UntypedActor import akka.camel._ import org.apache.camel.{ CamelContext, ProducerTemplate } +import org.apache.camel.impl.DefaultCamelContext /** * Subclass this abstract class to create an untyped producer actor. This class is meant to be used from Java. @@ -64,7 +65,7 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport { /** * Returns the CamelContext. */ - def getCamelContext(): CamelContext = camel.context + def getCamelContext(): DefaultCamelContext = camel.context /** * Returns the ProducerTemplate. diff --git a/akka-camel/src/main/scala/akka/package.scala b/akka-camel/src/main/scala/akka/package.scala index e1f2c0756e..9347580e76 100644 --- a/akka-camel/src/main/scala/akka/package.scala +++ b/akka-camel/src/main/scala/akka/package.scala @@ -17,4 +17,4 @@ package object camel { * }}} */ implicit def toActorRouteDefinition[T <: ProcessorDefinition[T]](definition: ProcessorDefinition[T]) = new ActorRouteDefinition(definition) -} \ No newline at end of file +} diff --git a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java index 466f213092..0c9aad7e23 100644 --- a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java +++ b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java @@ -4,23 +4,20 @@ package akka.camel; -import static org.junit.Assert.assertEquals; - -import java.util.concurrent.TimeUnit; - -import org.junit.AfterClass; -import org.junit.Test; - -import scala.concurrent.Await; -import scala.concurrent.ExecutionContext; -import scala.concurrent.util.Duration; -import scala.concurrent.util.FiniteDuration; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; -import akka.testkit.AkkaSpec; import akka.testkit.JavaTestKit; - +import akka.util.Timeout; +import scala.concurrent.Await; +import scala.concurrent.ExecutionContext; +import scala.concurrent.util.Duration; +import org.junit.AfterClass; +import org.junit.Test; +import java.util.concurrent.TimeUnit; +import akka.testkit.AkkaSpec; +import scala.concurrent.util.FiniteDuration; +import static org.junit.Assert.*; /** * @author Martin Krasser */ @@ -33,30 +30,27 @@ public class ConsumerJavaTestBase { system.shutdown(); } - @Test - public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse() - throws Exception { - new JavaTestKit(system) { - { - String result = new EventFilter(Exception.class) { - protected String run() { - FiniteDuration timeout = Duration.create(1, TimeUnit.SECONDS); - Camel camel = CamelExtension.get(system); - ExecutionContext executionContext = system.dispatcher(); - try { - @SuppressWarnings("unused") - ActorRef ref = Await.result(camel.activationFutureFor( - system.actorOf(new Props(SampleErrorHandlingConsumer.class)), - timeout, executionContext), timeout); - return camel.template().requestBody( - "direct:error-handler-test-java", "hello", String.class); - } catch (Exception e) { - return e.getMessage(); - } - } - }.occurrences(1).exec(); - assertEquals("error: hello", result); - } - }; - } + @Test + public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse() throws Exception { + new JavaTestKit(system) {{ + String result = new EventFilter(Exception.class) { + protected String run() { + FiniteDuration duration = Duration.create(1, TimeUnit.SECONDS); + Timeout timeout = new Timeout(duration); + Camel camel = CamelExtension.get(system); + ExecutionContext executionContext = system.dispatcher(); + try { + Await.result( + camel.activationFutureFor(system.actorOf(new Props(SampleErrorHandlingConsumer.class), "sample-error-handling-consumer"), timeout, executionContext), + duration); + return camel.template().requestBody("direct:error-handler-test-java", "hello", String.class); + } + catch (Exception e) { + return e.getMessage(); + } + } + }.occurrences(1).exec(); + assertEquals("error: hello", result); + }}; + } } diff --git a/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java index db26c9e8ab..77b0294f60 100644 --- a/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java +++ b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java @@ -4,10 +4,10 @@ import akka.actor.*; import akka.camel.internal.component.CamelPath; import akka.camel.javaapi.UntypedConsumerActor; import akka.camel.javaapi.UntypedProducerActor; +import akka.util.Timeout; import scala.concurrent.Await; import scala.concurrent.ExecutionContext; import scala.concurrent.util.Duration; -import scala.concurrent.util.FiniteDuration; import org.apache.camel.CamelExecutionException; import org.apache.camel.Exchange; import org.apache.camel.Predicate; @@ -16,6 +16,8 @@ import org.apache.camel.component.mock.MockEndpoint; import org.junit.Before; import org.junit.After; import org.junit.Test; +import scala.concurrent.util.FiniteDuration; + import java.util.concurrent.TimeUnit; public class CustomRouteTestBase { @@ -60,11 +62,12 @@ public class CustomRouteTestBase { @Test public void testCustomConsumerRoute() throws Exception { MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockConsumer", MockEndpoint.class); - FiniteDuration timeout = Duration.create(10, TimeUnit.SECONDS); + FiniteDuration duration = Duration.create(10, TimeUnit.SECONDS); + Timeout timeout = new Timeout(duration); ExecutionContext executionContext = system.dispatcher(); ActorRef consumer = Await.result( camel.activationFutureFor(system.actorOf(new Props(TestConsumer.class), "testConsumer"), timeout, executionContext), - timeout); + duration); camel.context().addRoutes(new CustomRouteBuilder("direct:testRouteConsumer",consumer)); camel.template().sendBody("direct:testRouteConsumer", "test"); assertMockEndpoint(mockEndpoint); @@ -74,15 +77,16 @@ public class CustomRouteTestBase { @Test public void testCustomAckConsumerRoute() throws Exception { MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAck", MockEndpoint.class); - FiniteDuration timeout = Duration.create(10, TimeUnit.SECONDS); + FiniteDuration duration = Duration.create(10, TimeUnit.SECONDS); + Timeout timeout = new Timeout(duration); ExecutionContext executionContext = system.dispatcher(); ActorRef consumer = Await.result( camel.activationFutureFor( system.actorOf( new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAck","mock:mockAck"); } }), "testConsumerAck"), timeout, executionContext), - timeout); - camel.context().addRoutes(new CustomRouteBuilder("direct:testAck", consumer, false, timeout)); + duration); + camel.context().addRoutes(new CustomRouteBuilder("direct:testAck", consumer, false, duration)); camel.template().sendBody("direct:testAck", "test"); assertMockEndpoint(mockEndpoint); system.stop(consumer); @@ -92,11 +96,12 @@ public class CustomRouteTestBase { public void testCustomAckConsumerRouteFromUri() throws Exception { MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAckUri", MockEndpoint.class); ExecutionContext executionContext = system.dispatcher(); - FiniteDuration timeout = Duration.create(10, TimeUnit.SECONDS); + FiniteDuration duration = Duration.create(10, TimeUnit.SECONDS); + Timeout timeout = new Timeout(duration); ActorRef consumer = Await.result( - camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAckFromUri","mock:mockAckUri"); } }), "testConsumerAckUri"), + camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAckFromUri","mock:mockAckUri"); } }), "testConsumerAckUri"), timeout, executionContext), - timeout); + duration); camel.context().addRoutes(new CustomRouteBuilder("direct:testAckFromUri","akka://test/user/testConsumerAckUri?autoAck=false")); camel.template().sendBody("direct:testAckFromUri", "test"); assertMockEndpoint(mockEndpoint); @@ -105,12 +110,13 @@ public class CustomRouteTestBase { @Test(expected=CamelExecutionException.class) public void testCustomTimeoutConsumerRoute() throws Exception { - FiniteDuration timeout = Duration.create(10, TimeUnit.SECONDS); + FiniteDuration duration = Duration.create(10, TimeUnit.SECONDS); + Timeout timeout = new Timeout(duration); ExecutionContext executionContext = system.dispatcher(); ActorRef consumer = Await.result( camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerException","mock:mockException"); } }), "testConsumerException"), timeout, executionContext), - timeout); + duration); camel.context().addRoutes(new CustomRouteBuilder("direct:testException", consumer, false, Duration.create(0, TimeUnit.SECONDS))); camel.template().sendBody("direct:testException", "test"); } diff --git a/akka-camel/src/test/java/akka/camel/MessageJavaTestBase.java b/akka-camel/src/test/java/akka/camel/MessageJavaTestBase.java index ba483e7a13..95cdc5007b 100644 --- a/akka-camel/src/test/java/akka/camel/MessageJavaTestBase.java +++ b/akka-camel/src/test/java/akka/camel/MessageJavaTestBase.java @@ -5,6 +5,7 @@ package akka.camel; import akka.actor.ActorSystem; +import akka.dispatch.Mapper; import akka.japi.Function; import org.apache.camel.NoTypeConversionAvailableException; import org.junit.AfterClass; @@ -48,12 +49,6 @@ public class MessageJavaTestBase { message(1.4).getBodyAs(InputStream.class,camel.context()); } - - @Test public void shouldReturnDoubleHeader() { - CamelMessage message = message("test" , createMap("test", 1.4)); - assertEquals(1.4, message.getHeader("test")); - } - @Test public void shouldConvertDoubleHeaderToString() { CamelMessage message = message("test" , createMap("test", 1.4)); assertEquals("1.4", message.getHeaderAs("test", String.class,camel.context())); @@ -105,24 +100,6 @@ public class MessageJavaTestBase { message("test1" , createMap("A", "1")).withHeaders(createMap("C", "3"))); } - @Test public void shouldAddHeaderAndPreserveBodyAndHeaders() { - assertEquals( - message("test1" , createMap("A", "1", "B", "2")), - message("test1" , createMap("A", "1")).addHeader("B", "2")); - } - - @Test public void shouldAddHeadersAndPreserveBodyAndHeaders() { - assertEquals( - message("test1" , createMap("A", "1", "B", "2")), - message("test1" , createMap("A", "1")).addHeaders(createMap("B", "2"))); - } - - @Test public void shouldRemoveHeadersAndPreserveBodyAndRemainingHeaders() { - assertEquals( - message("test1" , createMap("A", "1")), - message("test1" , createMap("A", "1", "B", "2")).withoutHeader("B")); - } - private static Set createSet(String... entries) { HashSet set = new HashSet(); set.addAll(Arrays.asList(entries)); @@ -137,7 +114,8 @@ public class MessageJavaTestBase { return map; } - private static class TestTransformer implements Function { + private static class TestTransformer extends Mapper { + @Override public String apply(String param) { return param + "b"; } diff --git a/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java b/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java index e5603acec1..c654e3958d 100644 --- a/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java +++ b/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java @@ -6,34 +6,38 @@ package akka.camel; import akka.actor.Status; import akka.camel.javaapi.UntypedConsumerActor; +import akka.dispatch.Mapper; import scala.concurrent.util.Duration; import org.apache.camel.builder.Builder; import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.model.RouteDefinition; import scala.Option; +import scala.concurrent.util.FiniteDuration; /** * @author Martin Krasser */ public class SampleErrorHandlingConsumer extends UntypedConsumerActor { + private static Mapper> mapper = new Mapper>() { + public ProcessorDefinition apply(RouteDefinition rd) { + return rd.onException(Exception.class).handled(true).transform(Builder.exceptionMessage()).end(); + } + }; public String getEndpointUri() { return "direct:error-handler-test-java"; } @Override - //TODO write test confirming this gets called in java - public ProcessorDefinition onRouteDefinition(RouteDefinition rd) { - return rd.onException(Exception.class).handled(true).transform(Builder.exceptionMessage()).end(); + public Mapper> onRouteDefinition() { + return mapper; } @Override - public Duration replyTimeout(){ + public FiniteDuration replyTimeout(){ return Duration.create(1, "second"); } - - public void onReceive(Object message) throws Exception { CamelMessage msg = (CamelMessage) message; String body = msg.getBodyAs(String.class,this.getCamelContext()); diff --git a/akka-camel/src/test/java/akka/camel/SampleUntypedActor.java b/akka-camel/src/test/java/akka/camel/SampleUntypedActor.java deleted file mode 100644 index e1c17900c3..0000000000 --- a/akka-camel/src/test/java/akka/camel/SampleUntypedActor.java +++ /dev/null @@ -1,16 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.camel; - -import akka.actor.UntypedActor; - -/** - * @author Martin Krasser - */ -public class SampleUntypedActor extends UntypedActor { - public void onReceive(Object message) { - System.out.println("Yay! I haz a message!"); - } -} diff --git a/akka-camel/src/test/resources/logback-test.xml b/akka-camel/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..1b571cf387 --- /dev/null +++ b/akka-camel/src/test/resources/logback-test.xml @@ -0,0 +1,23 @@ + + + + + + + %date{ISO8601} %-5level %logger %X{akkaSource} %X{sourceThread} - %msg%n + + + + + target/akka-camel.log + true + + %date{ISO8601} %-5level %logger %X{akkaSource} %X{sourceThread} - %msg%n + + + + + + + + diff --git a/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala index fe79bacdd8..54c671c3b5 100644 --- a/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala @@ -10,21 +10,22 @@ import org.scalatest.matchers.MustMatchers import scala.concurrent.util.duration._ import org.apache.camel.ProducerTemplate import akka.actor._ -import akka.util.Timeout import TestSupport._ import org.scalatest.WordSpec import akka.testkit.TestLatch -import scala.concurrent.Await +import concurrent.Await import java.util.concurrent.TimeoutException +import akka.util.Timeout class ActivationIntegrationTest extends WordSpec with MustMatchers with SharedCamelSystem { - implicit val timeout = 10 seconds + val timeoutDuration = 10 seconds + implicit val timeout = Timeout(timeoutDuration) def template: ProducerTemplate = camel.template import system.dispatcher "ActivationAware must be notified when endpoint is activated" in { val latch = new TestLatch(0) - val actor = system.actorOf(Props(new TestConsumer("direct:actor-1", latch))) + val actor = system.actorOf(Props(new TestConsumer("direct:actor-1", latch)), "act-direct-actor-1") Await.result(camel.activationFutureFor(actor), 10 seconds) must be === actor template.requestBody("direct:actor-1", "test") must be("received test") @@ -40,32 +41,39 @@ class ActivationIntegrationTest extends WordSpec with MustMatchers with SharedCa super.postStop() latch.countDown() } - }) - Await.result(camel.activationFutureFor(actor), timeout) + }, name = "direct-a3") + Await.result(camel.activationFutureFor(actor), timeoutDuration) system.stop(actor) - Await.result(camel.deactivationFutureFor(actor), timeout) - Await.ready(latch, 10 second) + Await.result(camel.deactivationFutureFor(actor), timeoutDuration) + Await.ready(latch, timeoutDuration) } "ActivationAware must time out when waiting for endpoint de-activation for too long" in { val latch = new TestLatch(0) - val actor = start(new TestConsumer("direct:a5", latch)) - Await.result(camel.activationFutureFor(actor), timeout) + val actor = start(new TestConsumer("direct:a5", latch), name = "direct-a5") + Await.result(camel.activationFutureFor(actor), timeoutDuration) intercept[TimeoutException] { Await.result(camel.deactivationFutureFor(actor), 1 millis) } } "activationFutureFor must fail if notification timeout is too short and activation is not complete yet" in { val latch = new TestLatch(1) - try { - val actor = system.actorOf(Props(new TestConsumer("direct:actor-4", latch))) - intercept[TimeoutException] { Await.result(camel.activationFutureFor(actor), 1 millis) } - } finally latch.countDown() + val actor = system.actorOf(Props(new TestConsumer("direct:actor-4", latch)), "direct-actor-4") + intercept[TimeoutException] { Await.result(camel.activationFutureFor(actor), 1 millis) } + latch.countDown() + // after the latch is removed, complete the wait for completion so this test does not later on + // print errors because of the registerConsumer timing out. + Await.result(camel.activationFutureFor(actor), timeoutDuration) } class TestConsumer(uri: String, latch: TestLatch) extends Consumer { def endpointUri = uri - Await.ready(latch, 60 seconds) + + override def preStart() { + Await.ready(latch, 60 seconds) + super.preStart() + } + override def receive = { case msg: CamelMessage ⇒ sender ! "received " + msg.body } diff --git a/akka-camel/src/test/scala/akka/camel/CamelMessageTest.scala b/akka-camel/src/test/scala/akka/camel/CamelMessageTest.scala index c5dfd01d00..0fd0a73fb6 100644 --- a/akka-camel/src/test/scala/akka/camel/CamelMessageTest.scala +++ b/akka-camel/src/test/scala/akka/camel/CamelMessageTest.scala @@ -16,24 +16,11 @@ class CamelMessageTest extends MustMatchers with WordSpec with SharedCamelSystem "overwrite body and add header" in { val msg = sampleMessage - CamelMessage("blah", Map("key" -> "baz")).copyContentTo(msg) + CamelMessage.copyContent(CamelMessage("blah", Map("key" -> "baz")), msg) assert(msg.getBody === "blah") assert(msg.getHeader("foo") === "bar") assert(msg.getHeader("key") === "baz") } - - "create message with body and header" in { - val m = CamelMessage.from(sampleMessage) - assert(m.body === "test") - assert(m.headers("foo") === "bar") - } - - "create message with body and header and custom header" in { - val m = CamelMessage.from(sampleMessage, Map("key" -> "baz")) - assert(m.body === "test") - assert(m.headers("foo") === "bar") - assert(m.headers("key") === "baz") - } } private[camel] def sampleMessage = { diff --git a/akka-camel/src/test/scala/akka/camel/ConcurrentActivationTest.scala b/akka-camel/src/test/scala/akka/camel/ConcurrentActivationTest.scala new file mode 100644 index 0000000000..d4983bbf6a --- /dev/null +++ b/akka-camel/src/test/scala/akka/camel/ConcurrentActivationTest.scala @@ -0,0 +1,159 @@ +package akka.camel + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import akka.camel.TestSupport.NonSharedCamelSystem +import akka.actor.{ ActorRef, Props, Actor } +import akka.routing.BroadcastRouter +import concurrent.{ Promise, Await, Future } +import scala.concurrent.util.duration._ +import language.postfixOps +import akka.testkit._ +import akka.util.Timeout +import org.apache.camel.model.{ProcessorDefinition, RouteDefinition} +import org.apache.camel.builder.Builder + +/** + * A test to concurrently register and de-register consumer and producer endpoints + */ +class ConcurrentActivationTest extends WordSpec with MustMatchers with NonSharedCamelSystem { + + "Activation" must { + "support concurrent registrations and de-registrations" in { + implicit val timeout = Timeout(10 seconds) + val timeoutDuration = timeout.duration + implicit val ec = system.dispatcher + val number = 10 + filterEvents(EventFilter.warning(pattern = "received dead letter from .*producerRegistrar.*", occurrences = number*number)) { + // A ConsumerBroadcast creates 'number' amount of ConsumerRegistrars, which will register 'number' amount of endpoints, + // in total number*number endpoints, activating and deactivating every endpoint. + // a promise to the list of registrars, which have a list of actorRefs each. A tuple of a list of activated refs and a list of deactivated refs + val promiseRegistrarLists = Promise[(Future[List[List[ActorRef]]], Future[List[List[ActorRef]]])]() + // future to all the futures of activation and deactivation + val futureRegistrarLists = promiseRegistrarLists.future + + val ref = system.actorOf(Props(new ConsumerBroadcast(promiseRegistrarLists)), name = "broadcaster") + // create the registrars + ref ! CreateRegistrars(number) + // send a broadcast to all registrars, so that number * number messages are sent + // every Register registers a consumer and a producer + (1 to number).map(i ⇒ ref ! RegisterConsumersAndProducers("direct:concurrent-")) + // de-register all consumers and producers + ref ! DeRegisterConsumersAndProducers() + + val promiseAllRefs = Promise[(List[ActorRef], List[ActorRef])]() + val allRefsFuture = promiseAllRefs.future + // map over all futures, put all futures in one list of activated and deactivated actor refs. + futureRegistrarLists.map { + case (futureActivations, futureDeactivations) ⇒ + futureActivations zip futureDeactivations map { + case (activations, deactivations) ⇒ + promiseAllRefs.success( (activations.flatten, deactivations.flatten)) + } + } + val (activations, deactivations) = Await.result(allRefsFuture, timeoutDuration) + // must be the size of the activated activated producers and consumers + activations.size must be (2 * number * number) + // must be the size of the activated activated producers and consumers + deactivations.size must be ( 2* number * number ) + def partitionNames(refs:Seq[ActorRef]) = refs.map(_.path.name).partition(_.startsWith("concurrent-test-echo-consumer")) + def assertContainsSameElements(lists:(Seq[_], Seq[_])) { + val (a,b) = lists + a.intersect(b).size must be (a.size) + } + val (activatedConsumerNames, activatedProducerNames) = partitionNames(activations) + val (deactivatedConsumerNames, deactivatedProducerNames) = partitionNames(deactivations) + assertContainsSameElements(activatedConsumerNames, deactivatedConsumerNames) + assertContainsSameElements(activatedProducerNames, deactivatedProducerNames) + } + } + } + +} +class ConsumerBroadcast(promise: Promise[(Future[List[List[ActorRef]]], Future[List[List[ActorRef]]])]) extends Actor { + private var broadcaster: Option[ActorRef] = None + private implicit val ec = context.dispatcher + def receive = { + case CreateRegistrars(number) ⇒ + var allActivationFutures = List[Future[List[ActorRef]]]() + var allDeactivationFutures = List[Future[List[ActorRef]]]() + + val routees = (1 to number).map { i ⇒ + val activationListPromise = Promise[List[ActorRef]]() + val deactivationListPromise = Promise[List[ActorRef]]() + val activationListFuture = activationListPromise.future + val deactivationListFuture = deactivationListPromise.future + + allActivationFutures = allActivationFutures :+ activationListFuture + allDeactivationFutures = allDeactivationFutures :+ deactivationListFuture + context.actorOf(Props(new Registrar(i, number, activationListPromise, deactivationListPromise)), s"registrar-$i") + } + promise.success((Future.sequence(allActivationFutures)), Future.sequence(allDeactivationFutures)) + + broadcaster = Some(context.actorOf(Props[Registrar] withRouter (BroadcastRouter(routees)), "registrarRouter")) + case reg: Any ⇒ + broadcaster.foreach(_.forward(reg)) + } +} + +case class CreateRegistrars(number: Int) +case class RegisterConsumersAndProducers(endpointUri: String) +case class DeRegisterConsumersAndProducers() +case class Activations() +case class DeActivations() + +class Registrar(val start: Int, val number: Int, activationsPromise: Promise[List[ActorRef]], + deActivationsPromise: Promise[List[ActorRef]]) extends Actor { + private var actorRefs = Set[ActorRef]() + private var activations = Set[Future[ActorRef]]() + private var deActivations = Set[Future[ActorRef]]() + private var index = 0 + private val camel = CamelExtension(context.system) + private implicit val ec = context.dispatcher + private implicit val timeout = Timeout(10 seconds) + + def receive = { + case reg: RegisterConsumersAndProducers ⇒ + val i = index + add(new EchoConsumer(s"${reg.endpointUri}$start-$i"), s"concurrent-test-echo-consumer$start-$i") + add(new TestProducer(s"${reg.endpointUri}$start-$i"), s"concurrent-test-producer-$start-$i") + index = index + 1 + if (activations.size == number * 2) { + Future.sequence(activations.toList) map activationsPromise.success + } + case reg: DeRegisterConsumersAndProducers ⇒ + actorRefs.foreach { aref ⇒ + context.stop(aref) + deActivations = deActivations + camel.deactivationFutureFor(aref) + if (deActivations.size == number * 2) { + Future.sequence(deActivations.toList) map deActivationsPromise.success + } + } + } + + def add(actor: =>Actor, name:String) { + val ref = context.actorOf(Props(actor), name) + actorRefs = actorRefs + ref + activations = activations + camel.activationFutureFor(ref) + } +} + +class EchoConsumer(endpoint: String) extends Actor with Consumer { + + def endpointUri = endpoint + + def receive = { + case msg: CamelMessage ⇒ sender ! msg + } + + /** + * Returns the route definition handler for creating a custom route to this consumer. + * By default it returns an identity function, override this method to + * return a custom route definition handler. + */ + override def onRouteDefinition = (rd: RouteDefinition) => rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end +} + +class TestProducer(uri: String) extends Actor with Producer { + def endpointUri = uri +} diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala index 6ffacf3432..0de66ae082 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala @@ -4,6 +4,7 @@ package akka.camel +import internal.ActorActivationException import language.postfixOps import language.existentials @@ -11,7 +12,7 @@ import akka.actor._ import org.scalatest.matchers.MustMatchers import org.scalatest.WordSpec import akka.camel.TestSupport._ -import org.apache.camel.model.RouteDefinition +import org.apache.camel.model.{ ProcessorDefinition, RouteDefinition } import org.apache.camel.builder.Builder import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException } import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException } @@ -19,17 +20,19 @@ import akka.actor.Status.Failure import scala.concurrent.util.duration._ import concurrent.{ ExecutionContext, Await } import akka.testkit._ +import akka.util.Timeout class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedCamelSystem { "ConsumerIntegrationTest" must { - implicit val defaultTimeout = 10.seconds + val defaultTimeoutDuration = 10 seconds + implicit val defaultTimeout = Timeout(defaultTimeoutDuration) implicit def ec: ExecutionContext = system.dispatcher "Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in { - filterEvents(EventFilter[ActorInitializationException](occurrences = 1), EventFilter[FailedToCreateRouteException](occurrences = 1)) { + filterEvents(EventFilter[ActorActivationException](occurrences = 1)) { val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri"))) intercept[FailedToCreateRouteException] { - Await.result(camel.activationFutureFor(actorRef), defaultTimeout) + Await.result(camel.activationFutureFor(actorRef), defaultTimeoutDuration) } } } @@ -40,7 +43,7 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC def receive = { case m: CamelMessage ⇒ sender ! "received " + m.bodyAs[String] } - }) + }, name = "direct-a1") camel.sendTo("direct:a1", msg = "some message") must be("received some message") } @@ -48,17 +51,18 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC val SHORT_TIMEOUT = 10 millis val LONG_WAIT = 200 millis - start(new Consumer { + val ref = start(new Consumer { override def replyTimeout = SHORT_TIMEOUT def endpointUri = "direct:a3" def receive = { case _ ⇒ { Thread.sleep(LONG_WAIT.toMillis); sender ! "done" } } - }) + }, name = "ignore-this-deadletter-timeout-consumer-reply") val exception = intercept[CamelExecutionException] { camel.sendTo("direct:a3", msg = "some msg 3") } exception.getCause.getClass must be(classOf[TimeoutException]) + stop(ref) } "Consumer must process messages even after actor restart" in { @@ -74,90 +78,105 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC override def postRestart(reason: Throwable) { restarted.countDown() } - }) + }, "direct-a2") filterEvents(EventFilter[TestException](occurrences = 1)) { consumer ! "throw" - Await.ready(restarted, defaultTimeout) + Await.ready(restarted, defaultTimeoutDuration) camel.sendTo("direct:a2", msg = "xyz") must be("received xyz") } + stop(consumer) } "Consumer must unregister itself when stopped" in { - val consumer = start(new TestActor()) - Await.result(camel.activationFutureFor(consumer), defaultTimeout) + val consumer = start(new TestActor(), name = "test-actor-unregister") + Await.result(camel.activationFutureFor(consumer), defaultTimeoutDuration) camel.routeCount must be > (0) system.stop(consumer) - Await.result(camel.deactivationFutureFor(consumer), defaultTimeout) + Await.result(camel.deactivationFutureFor(consumer), defaultTimeoutDuration) camel.routeCount must be(0) } "Consumer must register on uri passed in through constructor" in { - val consumer = start(new TestActor("direct://test")) - Await.result(camel.activationFutureFor(consumer), defaultTimeout) + val consumer = start(new TestActor("direct://test"), name = "direct-test") + Await.result(camel.activationFutureFor(consumer), defaultTimeoutDuration) camel.routeCount must be > (0) camel.routes.get(0).getEndpoint.getEndpointUri must be("direct://test") system.stop(consumer) - Await.result(camel.deactivationFutureFor(consumer), defaultTimeout) + Await.result(camel.deactivationFutureFor(consumer), defaultTimeoutDuration) camel.routeCount must be(0) + stop(consumer) } "Error passing consumer supports error handling through route modification" in { - start(new ErrorThrowingConsumer("direct:error-handler-test") with ErrorPassing { - override def onRouteDefinition(rd: RouteDefinition) = { + val ref = start(new ErrorThrowingConsumer("direct:error-handler-test") { + override def onRouteDefinition = (rd: RouteDefinition) ⇒ { rd.onException(classOf[TestException]).handled(true).transform(Builder.exceptionMessage).end } - }) + }, name = "direct-error-handler-test") filterEvents(EventFilter[TestException](occurrences = 1)) { camel.sendTo("direct:error-handler-test", msg = "hello") must be("error: hello") } + stop(ref) } "Error passing consumer supports redelivery through route modification" in { - start(new FailingOnceConsumer("direct:failing-once-concumer") with ErrorPassing { - override def onRouteDefinition(rd: RouteDefinition) = { + val ref = start(new FailingOnceConsumer("direct:failing-once-concumer") { + override def onRouteDefinition = (rd: RouteDefinition) ⇒ { rd.onException(classOf[TestException]).maximumRedeliveries(1).end } - }) + }, name = "direct-failing-once-consumer") filterEvents(EventFilter[TestException](occurrences = 1)) { camel.sendTo("direct:failing-once-concumer", msg = "hello") must be("accepted: hello") } + stop(ref) } "Consumer supports manual Ack" in { - start(new ManualAckConsumer() { + val ref = start(new ManualAckConsumer() { def endpointUri = "direct:manual-ack" def receive = { case _ ⇒ sender ! Ack } - }) - camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout.toSeconds, TimeUnit.SECONDS) must be(null) //should not timeout + }, name = "direct-manual-ack-1") + camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeoutDuration.toSeconds, TimeUnit.SECONDS) must be(null) //should not timeout + stop(ref) } "Consumer handles manual Ack failure" in { val someException = new Exception("e1") - start(new ManualAckConsumer() { + val ref = start(new ManualAckConsumer() { def endpointUri = "direct:manual-ack" def receive = { case _ ⇒ sender ! Failure(someException) } - }) + }, name = "direct-manual-ack-2") intercept[ExecutionException] { - camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout.toSeconds, TimeUnit.SECONDS) + camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeoutDuration.toSeconds, TimeUnit.SECONDS) }.getCause.getCause must be(someException) + stop(ref) } "Consumer should time-out, if manual Ack not received within replyTimeout and should give a human readable error message" in { - start(new ManualAckConsumer() { + val ref = start(new ManualAckConsumer() { override def replyTimeout = 10 millis def endpointUri = "direct:manual-ack" def receive = { case _ ⇒ } - }) + }, name = "direct-manual-ack-3") intercept[ExecutionException] { - camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeout.toSeconds, TimeUnit.SECONDS) + camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeoutDuration.toSeconds, TimeUnit.SECONDS) }.getCause.getCause.getMessage must include("Failed to get Ack") + stop(ref) + } + "respond to onRouteDefinition" in { + val ref = start(new ErrorRespondingConsumer("direct:error-responding-consumer-1"), "error-responding-consumer") + filterEvents(EventFilter[TestException](occurrences = 1)) { + val response = camel.sendTo("direct:error-responding-consumer-1", "some body") + response must be("some body has an error") + } + stop(ref) } } } @@ -166,6 +185,25 @@ class ErrorThrowingConsumer(override val endpointUri: String) extends Consumer { def receive = { case msg: CamelMessage ⇒ throw new TestException("error: %s" format msg.body) } + override def preRestart(reason: Throwable, message: Option[Any]) { + super.preRestart(reason, message) + sender ! Failure(reason) + } +} + +class ErrorRespondingConsumer(override val endpointUri: String) extends Consumer { + def receive = { + case msg: CamelMessage ⇒ throw new TestException("Error!") + } + override def onRouteDefinition = (rd: RouteDefinition) ⇒ { + // Catch TestException and handle it by returning a modified version of the in message + rd.onException(classOf[TestException]).handled(true).transform(Builder.body.append(" has an error")).end + } + + final override def preRestart(reason: Throwable, message: Option[Any]) { + super.preRestart(reason, message) + sender ! Failure(reason) + } } class FailingOnceConsumer(override val endpointUri: String) extends Consumer { @@ -177,6 +215,11 @@ class FailingOnceConsumer(override val endpointUri: String) extends Consumer { else throw new TestException("rejected: %s" format msg.body) } + + final override def preRestart(reason: Throwable, message: Option[Any]) { + super.preRestart(reason, message) + sender ! Failure(reason) + } } class TestActor(uri: String = "file://target/abcde") extends Consumer { @@ -184,13 +227,6 @@ class TestActor(uri: String = "file://target/abcde") extends Consumer { def receive = { case _ ⇒ /* do nothing */ } } -trait ErrorPassing { - self: Actor ⇒ - final override def preRestart(reason: Throwable, message: Option[Any]) { - sender ! Failure(reason) - } -} - trait ManualAckConsumer extends Consumer { override def autoAck = false } diff --git a/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala b/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala index dab91623d4..c649424122 100644 --- a/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala +++ b/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala @@ -14,6 +14,8 @@ import org.scalatest.WordSpec import akka.event.LoggingAdapter import akka.actor.ActorSystem.Settings import com.typesafe.config.ConfigFactory +import org.apache.camel.impl.DefaultCamelContext +import org.apache.camel.spi.Registry class DefaultCamelTest extends WordSpec with SharedCamelSystem with MustMatchers with MockitoSugar { @@ -26,7 +28,7 @@ class DefaultCamelTest extends WordSpec with SharedCamelSystem with MustMatchers def camelWithMocks = new DefaultCamel(sys) { override val log = mock[LoggingAdapter] override lazy val template = mock[ProducerTemplate] - override lazy val context = mock[CamelContext] + override lazy val context = mock[DefaultCamelContext] override val settings = mock[CamelSettings] } @@ -62,5 +64,4 @@ class DefaultCamelTest extends WordSpec with SharedCamelSystem with MustMatchers verify(camel.context).stop() } - } \ No newline at end of file diff --git a/akka-camel/src/test/scala/akka/camel/MessageScalaTest.scala b/akka-camel/src/test/scala/akka/camel/MessageScalaTest.scala index 6285f13561..dd73027624 100644 --- a/akka-camel/src/test/scala/akka/camel/MessageScalaTest.scala +++ b/akka-camel/src/test/scala/akka/camel/MessageScalaTest.scala @@ -23,11 +23,6 @@ class MessageScalaTest extends FunSuite with MustMatchers with SharedCamelSystem } } - test("mustReturnDoubleHeader") { - val message = CamelMessage("test", Map("test" -> 1.4)) - message.header("test").get must be(1.4) - } - test("mustConvertDoubleHeaderToString") { val message = CamelMessage("test", Map("test" -> 1.4)) message.headerAs[String]("test").get must be("1.4") @@ -47,32 +42,14 @@ class MessageScalaTest extends FunSuite with MustMatchers with SharedCamelSystem } test("mustSetBodyAndPreserveHeaders") { - CamelMessage("test1", Map("A" -> "1")).withBody("test2") must be( + CamelMessage("test1", Map("A" -> "1")).copy(body = "test2") must be( CamelMessage("test2", Map("A" -> "1"))) } test("mustSetHeadersAndPreserveBody") { - CamelMessage("test1", Map("A" -> "1")).withHeaders(Map("C" -> "3")) must be( + CamelMessage("test1", Map("A" -> "1")).copy(headers = Map("C" -> "3")) must be( CamelMessage("test1", Map("C" -> "3"))) } - - test("mustAddHeaderAndPreserveBodyAndHeaders") { - CamelMessage("test1", Map("A" -> "1")).addHeader("B" -> "2") must be( - CamelMessage("test1", Map("A" -> "1", "B" -> "2"))) - - } - - test("mustAddHeadersAndPreserveBodyAndHeaders") { - CamelMessage("test1", Map("A" -> "1")).addHeaders(Map("B" -> "2")) must be( - CamelMessage("test1", Map("A" -> "1", "B" -> "2"))) - - } - - test("mustRemoveHeadersAndPreserveBodyAndRemainingHeaders") { - CamelMessage("test1", Map("A" -> "1", "B" -> "2")).withoutHeader("B") must be( - CamelMessage("test1", Map("A" -> "1"))) - - } } diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala index 6510f6fd67..c8e5902093 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala @@ -31,7 +31,6 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd val camelContext = camel.context // to make testing equality of messages easier, otherwise the breadcrumb shows up in the result. camelContext.setUseBreadcrumb(false) - val timeoutDuration = 1 second implicit val timeout = Timeout(timeoutDuration) override protected def beforeAll { camelContext.addRoutes(new TestRoute(system)) } @@ -40,7 +39,7 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd "A Producer on a sync Camel route" must { "produce a message and receive normal response" in { - val producer = system.actorOf(Props(new TestProducer("direct:producer-test-2", true))) + val producer = system.actorOf(Props(new TestProducer("direct:producer-test-2", true)), name = "direct-producer-2") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) val future = producer.ask(message)(timeoutDuration) val expected = CamelMessage("received TEST", Map(CamelMessage.MessageExchangeId -> "123")) @@ -48,6 +47,7 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd case result: CamelMessage ⇒ assert(result === expected) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) } + system.stop(producer) } "produce a message and receive failure response" in { @@ -68,7 +68,7 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: AkkaCamelException ⇒ Stop } - })) + }), name = "prod-anonymous-supervisor") val producer = Await.result[ActorRef](supervisor.ask(Props(new TestProducer("direct:producer-test-2"))).mapTo[ActorRef], timeoutDuration) val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { @@ -78,27 +78,32 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd } Await.ready(latch, timeoutDuration) deadActor must be(Some(producer)) + system.stop(producer) } "produce a message oneway" in { - val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1", true) with Oneway)) + val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1", true) with Oneway), name = "direct-producer-1-oneway") mockEndpoint.expectedBodiesReceived("TEST") producer ! CamelMessage("test", Map()) mockEndpoint.assertIsSatisfied() + system.stop(producer) } "produces message twoway without sender reference" in { - val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1"))) + // this test causes a dead letter which can be ignored. The producer is two-way but a oneway tell is used + // to communicate with it and the response is ignored, which ends up in a dead letter + val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1")), name = "ignore-this-deadletter-direct-producer-test-no-sender") mockEndpoint.expectedBodiesReceived("test") producer ! CamelMessage("test", Map()) mockEndpoint.assertIsSatisfied() + system.stop(producer) } } "A Producer on an async Camel route" must { "produce message to direct:producer-test-3 and receive normal response" in { - val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3"))) + val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")), name = "direct-producer-test-3") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) val future = producer.ask(message)(timeoutDuration) @@ -109,10 +114,11 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd result must be(expected) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) } + system.stop(producer) } "produce message to direct:producer-test-3 and receive failure response" in { - val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3"))) + val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")), name = "direct-producer-test-3-receive-failure") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { @@ -120,11 +126,12 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd e.getMessage must be("failure") e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) } + system.stop(producer) } "produce message, forward normal response of direct:producer-test-2 to a replying target actor and receive response" in { - val target = system.actorOf(Props[ReplyingForwardTarget]) - val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target))) + val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target") + val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forwarder") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) val future = producer.ask(message)(timeoutDuration) @@ -135,11 +142,13 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd result must be(expected) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) } + system.stop(target) + system.stop(producer) } "produce message, forward failure response of direct:producer-test-2 to a replying target actor and receive response" in { - val target = system.actorOf(Props[ReplyingForwardTarget]) - val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target))) + val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target") + val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forwarder-failure") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { @@ -147,30 +156,36 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd e.getMessage must be("failure") e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) } + system.stop(target) + system.stop(producer) } "produce message, forward normal response to a producing target actor and produce response to direct:forward-test-1" in { - val target = system.actorOf(Props[ProducingForwardTarget]) - val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target))) + val target = system.actorOf(Props[ProducingForwardTarget], name = "producer-forwarding-target") + val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forwarder-to-producing-target") mockEndpoint.expectedBodiesReceived("received test") producer.tell(CamelMessage("test", Map()), producer) mockEndpoint.assertIsSatisfied() + system.stop(target) + system.stop(producer) } "produce message, forward failure response to a producing target actor and produce response to direct:forward-test-1" in { - val target = system.actorOf(Props[ProducingForwardTarget]) - val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target))) + val target = system.actorOf(Props[ProducingForwardTarget], name = "producer-forwarding-target-failure") + val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forward-failure") filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { mockEndpoint.expectedMessageCount(1) mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure]) producer.tell(CamelMessage("fail", Map()), producer) mockEndpoint.assertIsSatisfied() } + system.stop(target) + system.stop(producer) } "produce message, forward normal response from direct:producer-test-3 to a replying target actor and receive response" in { - val target = system.actorOf(Props[ReplyingForwardTarget]) - val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target))) + val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target") + val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-to-replying-actor") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) val future = producer.ask(message)(timeoutDuration) @@ -180,11 +195,13 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd message must be(expected) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) } + system.stop(target) + system.stop(producer) } "produce message, forward failure response from direct:producer-test-3 to a replying target actor and receive response" in { - val target = system.actorOf(Props[ReplyingForwardTarget]) - val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target))) + val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target") + val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-forward-failure") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { @@ -192,19 +209,23 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd e.getMessage must be("failure") e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) } + system.stop(target) + system.stop(producer) } "produce message, forward normal response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in { - val target = system.actorOf(Props[ProducingForwardTarget]) - val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target))) + val target = system.actorOf(Props[ProducingForwardTarget], "producing-forward-target-normal") + val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-forward-normal") mockEndpoint.expectedBodiesReceived("received test") producer.tell(CamelMessage("test", Map()), producer) mockEndpoint.assertIsSatisfied() + system.stop(target) + system.stop(producer) } "produce message, forward failure response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in { - val target = system.actorOf(Props[ProducingForwardTarget]) - val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target))) + val target = system.actorOf(Props[ProducingForwardTarget], "producing-forward-target-failure") + val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-forward-failure-producing-target") filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { mockEndpoint.expectedMessageCount(1) mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure]) @@ -253,7 +274,7 @@ object ProducerFeatureTest { class ReplyingForwardTarget extends Actor { def receive = { case msg: CamelMessage ⇒ - context.sender ! (msg.addHeader("test" -> "result")) + context.sender ! (msg.copy(headers = msg.headers + ("test" -> "result"))) case msg: akka.actor.Status.Failure ⇒ msg.cause match { case e: AkkaCamelException ⇒ context.sender ! Status.Failure(new AkkaCamelException(e, e.headers + ("test" -> "failure"))) diff --git a/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala deleted file mode 100644 index 4d46544b99..0000000000 --- a/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.camel - -import language.postfixOps - -import org.scalatest.matchers.MustMatchers -import org.scalatest.WordSpec -import akka.camel.TestSupport.SharedCamelSystem -import scala.concurrent.util.duration._ -import akka.actor.{ ActorRef, Props } -import akka.util.Timeout -import scala.concurrent.Await - -class ProducerRegistryTest extends WordSpec with MustMatchers with SharedCamelSystem { - implicit val timeout = 5.seconds - implicit val ec = system.dispatcher - - "A ProducerRegistry" must { - def newEmptyActor: ActorRef = system.actorOf(Props.empty) - def registerProcessorFor(actorRef: ActorRef) = camel.registerProducer(actorRef, "mock:mock")._2 - - "register a started SendProcessor for the producer, which is stopped when the actor is stopped" in { - val actorRef = newEmptyActor - val processor = registerProcessorFor(actorRef) - Await.result(camel.activationFutureFor(actorRef), timeout) - processor.isStarted must be(true) - system.stop(actorRef) - Await.result(camel.deactivationFutureFor(actorRef), timeout) - (processor.isStopping || processor.isStopped) must be(true) - } - "remove and stop the SendProcessor if the actorRef is registered" in { - val actorRef = newEmptyActor - val processor = registerProcessorFor(actorRef) - camel.unregisterProducer(actorRef) - (processor.isStopping || processor.isStopped) must be(true) - } - "remove and stop only the SendProcessor for the actorRef that is registered" in { - val actorRef1 = newEmptyActor - val actorRef2 = newEmptyActor - val processor = registerProcessorFor(actorRef2) - // this generates a warning on the activationTracker. - camel.unregisterProducer(actorRef1) - (!processor.isStopped && !processor.isStopping) must be(true) - - camel.unregisterProducer(actorRef2) - (processor.isStopping || processor.isStopped) must be(true) - } - } - -} \ No newline at end of file diff --git a/akka-camel/src/test/scala/akka/camel/TestSupport.scala b/akka-camel/src/test/scala/akka/camel/TestSupport.scala index fe22b0e7a0..c25ccdab3c 100644 --- a/akka-camel/src/test/scala/akka/camel/TestSupport.scala +++ b/akka-camel/src/test/scala/akka/camel/TestSupport.scala @@ -19,11 +19,12 @@ import akka.util.Timeout import akka.testkit.AkkaSpec private[camel] object TestSupport { + def start(actor: ⇒ Actor, name: String)(implicit system: ActorSystem, timeout: Timeout): ActorRef = + Await.result(CamelExtension(system).activationFutureFor(system.actorOf(Props(actor), name))(timeout, system.dispatcher), timeout.duration) - def start(actor: ⇒ Actor)(implicit system: ActorSystem): ActorRef = { - val actorRef = system.actorOf(Props(actor)) - Await.result(CamelExtension(system).activationFutureFor(actorRef)(10 seconds, system.dispatcher), 10 seconds) - actorRef + def stop(actorRef: ActorRef)(implicit system: ActorSystem, timeout: Timeout) { + system.stop(actorRef) + Await.result(CamelExtension(system).deactivationFutureFor(actorRef)(timeout, system.dispatcher), timeout.duration) } private[camel] implicit def camelToTestWrapper(camel: Camel) = new CamelTestWrapper(camel) diff --git a/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala b/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala index ba853083bf..a9d097aa10 100644 --- a/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala @@ -33,7 +33,7 @@ class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfter "An UntypedProducer producing a message to a sync Camel route" must { "produce a message and receive a normal response" in { - val producer = system.actorOf(Props[SampleUntypedReplyingProducer]) + val producer = system.actorOf(Props[SampleUntypedReplyingProducer], name = "sample-untyped-replying-producer") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) val future = producer.ask(message)(timeout) @@ -47,7 +47,7 @@ class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfter } "produce a message and receive a failure response" in { - val producer = system.actorOf(Props[SampleUntypedReplyingProducer]) + val producer = system.actorOf(Props[SampleUntypedReplyingProducer], name = "sample-untyped-replying-producer-failure") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { @@ -66,7 +66,7 @@ class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfter "An UntypedProducer producing a message to a sync Camel route and then forwarding the response" must { "produce a message and send a normal response to direct:forward-test-1" in { - val producer = system.actorOf(Props[SampleUntypedForwardingProducer]) + val producer = system.actorOf(Props[SampleUntypedForwardingProducer], name = "sample-untyped-forwarding-producer") mockEndpoint.expectedBodiesReceived("received test") producer.tell(CamelMessage("test", Map[String, Any]()), producer) diff --git a/akka-camel/src/test/scala/akka/camel/internal/ActivationTrackerTest.scala b/akka-camel/src/test/scala/akka/camel/internal/ActivationTrackerTest.scala index ddff6017ec..b290ba254b 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/ActivationTrackerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/ActivationTrackerTest.scala @@ -5,9 +5,9 @@ import org.scalatest.matchers.MustMatchers import scala.concurrent.util.duration._ import org.scalatest.{ GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, WordSpec } import akka.actor.{ Props, ActorSystem } -import scala.concurrent.util.Duration import akka.camel._ import akka.testkit.{ TimingTest, TestProbe, TestKit } +import akka.camel.internal.ActivationProtocol._ import scala.concurrent.util.FiniteDuration class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach with GivenWhenThen { @@ -25,7 +25,7 @@ class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec w anotherAwaiting = new Awaiting(actor) } - val at = system.actorOf(Props[ActivationTracker]) + val at = system.actorOf(Props[ActivationTracker], name = "activationTrackker") "ActivationTracker forwards activation message to all awaiting parties" taggedAs TimingTest in { awaiting.awaitActivation() diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala index 09f9431cc9..3a7cca93e2 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala @@ -11,7 +11,7 @@ import org.mockito.Mockito._ import org.apache.camel.{ CamelContext, ProducerTemplate, AsyncCallback } import java.util.concurrent.atomic.AtomicBoolean import scala.concurrent.util.duration._ -import scala.concurrent.util.Duration +import concurrent.util.{ FiniteDuration, Duration } import java.lang.String import akka.actor.{ ActorRef, Props, ActorSystem, Actor } import akka.camel._ @@ -21,14 +21,17 @@ import akka.camel.TestSupport._ import java.util.concurrent.{ TimeoutException, CountDownLatch, TimeUnit } import org.mockito.{ ArgumentMatcher, Matchers, Mockito } import org.scalatest.matchers.MustMatchers -import akka.actor.Status.Failure +import akka.actor.Status.{ Success, Failure } import com.typesafe.config.ConfigFactory import akka.actor.ActorSystem.Settings import akka.event.LoggingAdapter import akka.testkit.{ TimingTest, TestKit, TestProbe } -import scala.concurrent.util.FiniteDuration +import org.apache.camel.impl.DefaultCamelContext +import concurrent.{ Await, Promise, Future } +import akka.util.Timeout class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with ActorProducerFixture { + implicit val timeout = Timeout(10 seconds) "ActorProducer" when { @@ -54,7 +57,45 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with "not expect response and not block" taggedAs TimingTest in { time(producer.processExchangeAdapter(exchange)) must be < (200 millis) } + } + "manualAck" when { + + "response is Ack" must { + "process the exchange" in { + producer = given(outCapable = false, autoAck = false) + import system.dispatcher + val future = Future { + producer.processExchangeAdapter(exchange) + } + within(1 second) { + probe.expectMsgType[CamelMessage] + info("message sent to consumer") + probe.sender ! Ack + } + verify(exchange, never()).setResponse(any[CamelMessage]) + info("no response forwarded to exchange") + Await.ready(future, timeout.duration) + } + } + "the consumer does not respond wit Ack" must { + "not block forever" in { + producer = given(outCapable = false, autoAck = false) + import system.dispatcher + val future = Future { + producer.processExchangeAdapter(exchange) + } + within(1 second) { + probe.expectMsgType[CamelMessage] + info("message sent to consumer") + } + verify(exchange, never()).setResponse(any[CamelMessage]) + info("no response forwarded to exchange") + intercept[TimeoutException] { + Await.ready(future, camel.settings.replyTimeout - (1 seconds)) + } + } + } } "out capable" when { @@ -93,9 +134,6 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with } } - - //TODO: write more tests for synchronous process(exchange) method - } "asynchronous" when { @@ -284,8 +322,21 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo def camelWithMocks = new DefaultCamel(sys) { override val log = mock[LoggingAdapter] override lazy val template = mock[ProducerTemplate] - override lazy val context = mock[CamelContext] - override val settings = mock[CamelSettings] + override lazy val context = mock[DefaultCamelContext] + override val settings = new CamelSettings(ConfigFactory.parseString( + """ + akka { + camel { + jmx = off + streamingCache = on + consumer { + auto-ack = on + reply-timeout = 2s + activation-timeout = 10s + } + } + } + """)) } camel = camelWithMocks @@ -351,6 +402,6 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo def receive = { case msg ⇒ sender ! "received " + msg } - })) + }), name = "echoActor") } diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/DurationConverterTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/DurationConverterTest.scala index c66a77c287..e8918a5b67 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/component/DurationConverterTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/component/DurationConverterTest.scala @@ -10,7 +10,8 @@ import org.scalatest.matchers.MustMatchers import scala.concurrent.util.duration._ import scala.concurrent.util.Duration import org.scalatest.WordSpec -import org.apache.camel.{ TypeConversionException, NoTypeConversionAvailableException } +import org.apache.camel.TypeConversionException +import language.postfixOps class DurationConverterSpec extends WordSpec with MustMatchers { import DurationTypeConverter._ diff --git a/akka-docs/rst/java/code/docs/camel/ActivationTestBase.java b/akka-docs/rst/java/code/docs/camel/ActivationTestBase.java index e0b9b60c2b..5c406e0fb4 100644 --- a/akka-docs/rst/java/code/docs/camel/ActivationTestBase.java +++ b/akka-docs/rst/java/code/docs/camel/ActivationTestBase.java @@ -6,6 +6,7 @@ package docs.camel; import akka.camel.Camel; import akka.camel.CamelExtension; import akka.camel.javaapi.UntypedConsumerActor; + import akka.util.Timeout; import scala.concurrent.Future; import scala.concurrent.util.Duration; import scala.concurrent.util.FiniteDuration; @@ -26,14 +27,14 @@ public class ActivationTestBase { ActorRef producer = system.actorOf(props,"myproducer"); Camel camel = CamelExtension.get(system); // get a future reference to the activation of the endpoint of the Consumer Actor - FiniteDuration duration = Duration.create(10, SECONDS); - Future activationFuture = camel.activationFutureFor(producer, duration, system.dispatcher()); + Timeout timeout = new Timeout(Duration.create(10, SECONDS)); + Future activationFuture = camel.activationFutureFor(producer, timeout, system.dispatcher()); //#CamelActivation //#CamelDeactivation // .. system.stop(producer); // get a future reference to the deactivation of the endpoint of the Consumer Actor - Future deactivationFuture = camel.deactivationFutureFor(producer, duration, system.dispatcher()); + Future deactivationFuture = camel.deactivationFutureFor(producer, timeout, system.dispatcher()); //#CamelDeactivation system.shutdown(); } diff --git a/akka-docs/rst/java/code/docs/camel/Consumer4.java b/akka-docs/rst/java/code/docs/camel/Consumer4.java index 960b523f3a..d1400b2e2f 100644 --- a/akka-docs/rst/java/code/docs/camel/Consumer4.java +++ b/akka-docs/rst/java/code/docs/camel/Consumer4.java @@ -11,7 +11,7 @@ public class Consumer4 extends UntypedConsumerActor { private final static FiniteDuration timeout = Duration.create(500, TimeUnit.MILLISECONDS); @Override - public Duration replyTimeout() { + public FiniteDuration replyTimeout() { return timeout; } diff --git a/akka-docs/rst/java/code/docs/camel/ErrorThrowingConsumer.java b/akka-docs/rst/java/code/docs/camel/ErrorThrowingConsumer.java index 9a01717287..ebf49dab11 100644 --- a/akka-docs/rst/java/code/docs/camel/ErrorThrowingConsumer.java +++ b/akka-docs/rst/java/code/docs/camel/ErrorThrowingConsumer.java @@ -3,6 +3,7 @@ package docs.camel; import akka.actor.Status; import akka.camel.CamelMessage; import akka.camel.javaapi.UntypedConsumerActor; +import akka.dispatch.Mapper; import org.apache.camel.builder.Builder; import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.model.RouteDefinition; @@ -11,6 +12,13 @@ import scala.Option; public class ErrorThrowingConsumer extends UntypedConsumerActor{ private String uri; + private static Mapper> mapper = new Mapper>() { + public ProcessorDefinition apply(RouteDefinition rd) { + // Catch any exception and handle it by returning the exception message as response + return rd.onException(Exception.class).handled(true).transform(Builder.exceptionMessage()).end(); + } + }; + public ErrorThrowingConsumer(String uri){ this.uri = uri; } @@ -29,9 +37,8 @@ public class ErrorThrowingConsumer extends UntypedConsumerActor{ } @Override - public ProcessorDefinition onRouteDefinition(RouteDefinition rd) { - // Catch any exception and handle it by returning the exception message as response - return rd.onException(Exception.class).handled(true).transform(Builder.exceptionMessage()).end(); + public Mapper> getRouteDefinitionHandler() { + return mapper; } @Override diff --git a/akka-docs/rst/java/code/docs/camel/Responder.java b/akka-docs/rst/java/code/docs/camel/Responder.java index 12ca8603cf..8750aaf700 100644 --- a/akka-docs/rst/java/code/docs/camel/Responder.java +++ b/akka-docs/rst/java/code/docs/camel/Responder.java @@ -2,6 +2,7 @@ package docs.camel; //#CustomRoute import akka.actor.UntypedActor; import akka.camel.CamelMessage; +import akka.dispatch.Mapper; import akka.japi.Function; public class Responder extends UntypedActor{ @@ -15,7 +16,8 @@ public class Responder extends UntypedActor{ } private CamelMessage createResponse(CamelMessage msg) { - return msg.mapBody(new Function() { + return msg.mapBody(new Mapper() { + @Override public String apply(String body) { return String.format("received %s", body); } diff --git a/akka-docs/rst/java/code/docs/camel/Transformer.java b/akka-docs/rst/java/code/docs/camel/Transformer.java index be077cf27d..17338f6837 100644 --- a/akka-docs/rst/java/code/docs/camel/Transformer.java +++ b/akka-docs/rst/java/code/docs/camel/Transformer.java @@ -2,6 +2,7 @@ package docs.camel; //#TransformOutgoingMessage import akka.camel.CamelMessage; import akka.camel.javaapi.UntypedProducerActor; +import akka.dispatch.Mapper; import akka.japi.Function; public class Transformer extends UntypedProducerActor{ @@ -16,7 +17,8 @@ public class Transformer extends UntypedProducerActor{ } private CamelMessage upperCase(CamelMessage msg) { - return msg.mapBody(new Function() { + return msg.mapBody(new Mapper() { + @Override public String apply(String body) { return body.toUpperCase(); } diff --git a/akka-docs/rst/java/code/docs/camel/sample/http/HttpTransformer.java b/akka-docs/rst/java/code/docs/camel/sample/http/HttpTransformer.java index 267d828cd7..3f35b5a2d4 100644 --- a/akka-docs/rst/java/code/docs/camel/sample/http/HttpTransformer.java +++ b/akka-docs/rst/java/code/docs/camel/sample/http/HttpTransformer.java @@ -3,6 +3,7 @@ package docs.camel.sample.http; import akka.actor.Status; import akka.actor.UntypedActor; import akka.camel.CamelMessage; +import akka.dispatch.Mapper; import akka.japi.Function; //#HttpExample @@ -10,7 +11,8 @@ public class HttpTransformer extends UntypedActor{ public void onReceive(Object message) { if (message instanceof CamelMessage) { CamelMessage camelMessage = (CamelMessage) message; - CamelMessage replacedMessage = camelMessage.mapBody(new Function(){ + CamelMessage replacedMessage = camelMessage.mapBody(new Mapper(){ + @Override public String apply(Object body) { String text = new String((byte[])body); return text.replaceAll("Akka ", "AKKA "); diff --git a/akka-docs/rst/java/code/docs/camel/sample/route/Transformer.java b/akka-docs/rst/java/code/docs/camel/sample/route/Transformer.java index 6ea647edd5..e7f599af40 100644 --- a/akka-docs/rst/java/code/docs/camel/sample/route/Transformer.java +++ b/akka-docs/rst/java/code/docs/camel/sample/route/Transformer.java @@ -3,6 +3,7 @@ package docs.camel.sample.route; import akka.actor.ActorRef; import akka.actor.UntypedActor; import akka.camel.CamelMessage; +import akka.dispatch.Mapper; import akka.japi.Function; public class Transformer extends UntypedActor { @@ -16,7 +17,8 @@ public class Transformer extends UntypedActor { if (message instanceof CamelMessage) { // example: transform message body "foo" to "- foo -" and forward result to producer CamelMessage camelMessage = (CamelMessage) message; - CamelMessage transformedMessage = camelMessage.mapBody(new Function(){ + CamelMessage transformedMessage = camelMessage.mapBody(new Mapper(){ + @Override public String apply(String body) { return String.format("- %s -",body); } diff --git a/akka-docs/rst/scala/code/docs/camel/CustomRoute.scala b/akka-docs/rst/scala/code/docs/camel/CustomRoute.scala index 5895a16ef1..ef34fcfb64 100644 --- a/akka-docs/rst/scala/code/docs/camel/CustomRoute.scala +++ b/akka-docs/rst/scala/code/docs/camel/CustomRoute.scala @@ -48,10 +48,7 @@ object CustomRoute { def receive = { case msg: CamelMessage ⇒ throw new Exception("error: %s" format msg.body) } - override def onRouteDefinition(rd: RouteDefinition) = { - // Catch any exception and handle it by returning the exception message as response - rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end - } + override def onRouteDefinition = (rd) ⇒ rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end final override def preRestart(reason: Throwable, message: Option[Any]) { sender ! Failure(reason) diff --git a/akka-docs/rst/scala/code/docs/camel/HttpExample.scala b/akka-docs/rst/scala/code/docs/camel/HttpExample.scala index e5429f391b..9d1f665c64 100644 --- a/akka-docs/rst/scala/code/docs/camel/HttpExample.scala +++ b/akka-docs/rst/scala/code/docs/camel/HttpExample.scala @@ -21,7 +21,7 @@ object HttpExample { def endpointUri = "jetty://http://akka.io/?bridgeEndpoint=true" override def transformOutgoingMessage(msg: Any) = msg match { - case msg: CamelMessage ⇒ msg.addHeaders(msg.headers(Set(Exchange.HTTP_PATH))) + case msg: CamelMessage ⇒ msg.copy(headers = msg.headers ++ msg.headers(Set(Exchange.HTTP_PATH))) } override def routeResponse(msg: Any) { transformer forward msg } diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 59fa1c490b..449807c6a6 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -639,7 +639,7 @@ object Dependencies { val kernel = Seq(Test.scalatest, Test.junit) - val camel = Seq(camelCore, Test.scalatest, Test.junit, Test.mockito) + val camel = Seq(camelCore, Test.scalatest, Test.junit, Test.mockito, Test.logback, Test.commonsIo) val camelSample = Seq(CamelSample.camelJetty) @@ -655,10 +655,12 @@ object Dependencies { val multiNodeSample = Seq(Test.scalatest) } - +object V { + val Camel = "2.10.0" +} object Dependency { // Compile - val camelCore = "org.apache.camel" % "camel-core" % "2.10.0" exclude("org.slf4j", "slf4j-api") // ApacheV2 + val camelCore = "org.apache.camel" % "camel-core" % V.Camel exclude("org.slf4j", "slf4j-api") // ApacheV2 val config = "com.typesafe" % "config" % "0.5.2" // ApacheV2 val netty = "io.netty" % "netty" % "3.5.4.Final" // ApacheV2 val protobuf = "com.google.protobuf" % "protobuf-java" % "2.4.1" // New BSD @@ -689,7 +691,7 @@ object Dependency { // Camel Sample object CamelSample { - val camelJetty = "org.apache.camel" % "camel-jetty" % "2.10.0" // ApacheV2 + val camelJetty = "org.apache.camel" % "camel-jetty" % V.Camel // ApacheV2 } }