diff --git a/akka-camel/src/main/resources/reference.conf b/akka-camel/src/main/resources/reference.conf index 302a31b754..43ef8d17a5 100644 --- a/akka-camel/src/main/resources/reference.conf +++ b/akka-camel/src/main/resources/reference.conf @@ -12,18 +12,22 @@ akka { # enable/disable streaming cache on the Camel Context streamingCache = on consumer { - # Configured setting which determines whether one-way communications between an endpoint and this consumer actor - # should be auto-acknowledged or application-acknowledged. - # This flag has only effect when exchange is in-only. - auto-ack = on + # Configured setting which determines whether one-way communications between an endpoint and this consumer actor + # should be auto-acknowledged or application-acknowledged. + # This flag has only effect when exchange is in-only. + auto-ack = on - # When endpoint is out-capable (can produce responses) reply-timeout is the maximum time - # the endpoint can take to send the response before the message exchange fails. - # This setting is used for out-capable, in-only, manually acknowledged communication. - reply-timeout = 1m + # When endpoint is out-capable (can produce responses) reply-timeout is the maximum time + # the endpoint can take to send the response before the message exchange fails. + # This setting is used for out-capable, in-only, manually acknowledged communication. + reply-timeout = 1m - # The duration of time to await activation of an endpoint. - activation-timeout = 10s + # The duration of time to await activation of an endpoint. + activation-timeout = 10s + } + #Scheme to FQCN mappings for CamelMessage body conversions + conversions { + "file" = "java.io.InputStream" } } } diff --git a/akka-camel/src/main/scala/akka/camel/ActorRouteDefinition.scala b/akka-camel/src/main/scala/akka/camel/ActorRouteDefinition.scala index ebc99c7a92..9cb84a2a2a 100644 --- a/akka-camel/src/main/scala/akka/camel/ActorRouteDefinition.scala +++ b/akka-camel/src/main/scala/akka/camel/ActorRouteDefinition.scala @@ -4,8 +4,8 @@ package akka.camel -import internal.component.CamelPath import akka.actor.ActorRef +import akka.camel.internal.component.CamelPath import org.apache.camel.model.ProcessorDefinition import scala.concurrent.util.Duration diff --git a/akka-camel/src/main/scala/akka/camel/Camel.scala b/akka-camel/src/main/scala/akka/camel/Camel.scala index 43682c49c5..db32611868 100644 --- a/akka-camel/src/main/scala/akka/camel/Camel.scala +++ b/akka-camel/src/main/scala/akka/camel/Camel.scala @@ -8,10 +8,12 @@ import internal._ import akka.actor._ import org.apache.camel.ProducerTemplate import org.apache.camel.impl.DefaultCamelContext +import org.apache.camel.model.RouteDefinition import com.typesafe.config.Config import scala.concurrent.util.Duration -import java.util.concurrent.TimeUnit._ import scala.concurrent.util.FiniteDuration +import java.util.concurrent.TimeUnit._ +import akka.ConfigurationException /** * Camel trait encapsulates the underlying camel machinery. @@ -55,7 +57,7 @@ trait Camel extends Extension with Activation { * Settings for the Camel Extension * @param config the config */ -class CamelSettings private[camel] (config: Config) { +class CamelSettings private[camel] (config: Config, dynamicAccess: DynamicAccess) { /** * Configured setting for how long the actor should wait for activation before it fails. */ @@ -85,8 +87,23 @@ class CamelSettings private[camel] (config: Config) { */ final val streamingCache: Boolean = config.getBoolean("akka.camel.streamingCache") -} + final val conversions: (String, RouteDefinition) ⇒ RouteDefinition = { + import scala.collection.JavaConverters.asScalaSetConverter + val specifiedConversions = { + val section = config.getConfig("akka.camel.conversions") + section.entrySet.asScala.map(e ⇒ (e.getKey, section.getString(e.getKey))) + } + val conversions = (Map[String, Class[_ <: AnyRef]]() /: specifiedConversions) { + case (m, (key, fqcn)) ⇒ + m.updated(key, dynamicAccess.getClassFor[AnyRef](fqcn).recover { + case e ⇒ throw new ConfigurationException("Could not find/load Camel Converter class [" + fqcn + "]", e) + }.get) + } + (s: String, r: RouteDefinition) ⇒ conversions.get(s).fold(r)(r.convertBodyTo) + } + +} /** * This class can be used to get hold of an instance of the Camel class bound to the actor system. *

For example: diff --git a/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala b/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala index 1a7fff6f92..b19bdbc0a2 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala @@ -179,7 +179,7 @@ private[camel] class ConsumerRegistrar(activationTracker: ActorRef) extends Acto case Register(consumer, endpointUri, Some(consumerConfig)) ⇒ try { // if this throws, the supervisor stops the consumer and de-registers it on termination - camelContext.addRoutes(new ConsumerActorRouteBuilder(endpointUri, consumer, consumerConfig)) + camelContext.addRoutes(new ConsumerActorRouteBuilder(endpointUri, consumer, consumerConfig, camel.settings)) activationTracker ! EndpointActivated(consumer) } catch { case NonFatal(e) ⇒ throw new ActorActivationException(consumer, e) diff --git a/akka-camel/src/main/scala/akka/camel/internal/ConsumerActorRouteBuilder.scala b/akka-camel/src/main/scala/akka/camel/internal/ConsumerActorRouteBuilder.scala index cd8b70bed5..24e2c9a58e 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/ConsumerActorRouteBuilder.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/ConsumerActorRouteBuilder.scala @@ -4,15 +4,11 @@ package akka.camel.internal -import akka.camel._ -import component.CamelPath -import java.io.InputStream - -import org.apache.camel.builder.RouteBuilder - import akka.actor._ +import akka.camel._ +import akka.camel.internal.component.CamelPath +import org.apache.camel.builder.RouteBuilder import org.apache.camel.model.RouteDefinition -import akka.serialization.Serializer /** * For internal use only. @@ -22,18 +18,15 @@ import akka.serialization.Serializer * * @author Martin Krasser */ -private[camel] class ConsumerActorRouteBuilder(endpointUri: String, consumer: ActorRef, config: ConsumerConfig) extends RouteBuilder { +private[camel] class ConsumerActorRouteBuilder(endpointUri: String, consumer: ActorRef, config: ConsumerConfig, settings: CamelSettings) extends RouteBuilder { protected def targetActorUri = CamelPath.toUri(consumer, config.autoAck, config.replyTimeout) - def configure() { - val scheme = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..." - val route = from(endpointUri).routeId(consumer.path.toString) - val converted = Conversions(scheme, route) - val userCustomized = applyUserRouteCustomization(converted) - userCustomized.to(targetActorUri) - } + def configure(): Unit = + applyUserRouteCustomization( + settings.conversions.apply( + endpointUri take endpointUri.indexOf(":"), // e.g. "http" from "http://whatever/..." + from(endpointUri).routeId(consumer.path.toString))).to(targetActorUri) def applyUserRouteCustomization(rd: RouteDefinition) = config.onRouteDefinition(rd) - } diff --git a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala index 016b923bf0..072e578a05 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala @@ -1,6 +1,5 @@ package akka.camel.internal -import akka.actor.{ ActorRef, Props, ActorSystem } import akka.camel.internal.component.{ DurationTypeConverter, ActorComponent } import org.apache.camel.impl.DefaultCamelContext import scala.Predef._ @@ -16,6 +15,7 @@ import akka.util.Timeout import akka.pattern.ask import java.io.InputStream import org.apache.camel.model.RouteDefinition +import akka.actor.{ ExtendedActorSystem, ActorRef, Props, ActorSystem } /** * For internal use only. @@ -26,7 +26,7 @@ import org.apache.camel.model.RouteDefinition * In the typical scenario, when camel is used with akka extension, it is natural that camel reuses the actor system it extends. * Also by not creating extra internal actor system we are conserving resources. */ -private[camel] class DefaultCamel(val system: ActorSystem) extends Camel { +private[camel] class DefaultCamel(val system: ExtendedActorSystem) extends Camel { val supervisor = system.actorOf(Props[CamelSupervisor], "camel-supervisor") /** * For internal use only. @@ -43,7 +43,7 @@ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel { ctx } - val settings = new CamelSettings(system.settings.config) + val settings = new CamelSettings(system.settings.config, system.dynamicAccess) lazy val template: ProducerTemplate = context.createProducerTemplate() @@ -99,19 +99,4 @@ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel { case EndpointDeActivated(`endpoint`) ⇒ endpoint case EndpointFailedToDeActivate(`endpoint`, cause) ⇒ throw cause }) -} - -/** - * For internal use only. - */ -private[camel] object Conversions { - //FIXME Add this to the configuration, and move this functionality to the Camel Extension. - private val bodyConversions = Map( - "file" -> classOf[InputStream]) - - def apply(scheme: String, routeDefinition: RouteDefinition): RouteDefinition = bodyConversions.get(scheme) match { - case Some(clazz) ⇒ routeDefinition.convertBodyTo(clazz) - case None ⇒ routeDefinition - } -} - +} \ No newline at end of file diff --git a/akka-camel/src/test/scala/akka/camel/CamelConfigSpec.scala b/akka-camel/src/test/scala/akka/camel/CamelConfigSpec.scala new file mode 100644 index 0000000000..7a5b996243 --- /dev/null +++ b/akka-camel/src/test/scala/akka/camel/CamelConfigSpec.scala @@ -0,0 +1,41 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.camel +import org.scalatest.matchers.MustMatchers +import org.scalatest.WordSpec +import akka.actor.ActorSystem +import scala.concurrent.util.Duration +import java.util.concurrent.TimeUnit._ + +class CamelConfigSpec extends WordSpec with MustMatchers { + + "CamelConfigSpec" must { + "have correct config" in { + val system = ActorSystem("CamelConfigSpec") + try { + val settings = CamelExtension(system).settings + + val config = system.settings.config + + settings.activationTimeout must be === Duration(config.getMilliseconds("akka.camel.consumer.activation-timeout"), MILLISECONDS) + + settings.autoAck must be === config.getBoolean("akka.camel.consumer.auto-ack") + + settings.replyTimeout must be === Duration(config.getMilliseconds("akka.camel.consumer.reply-timeout"), MILLISECONDS) + + settings.streamingCache must be === config.getBoolean("akka.camel.streamingCache") + + settings.jmxStatistics must be === config.getBoolean("akka.camel.jmx") + + val conversions = config.getConfig("akka.camel.conversions") + + conversions.getString("file") must be === "java.io.InputStream" + conversions.entrySet.size must be === 1 + + } finally system.shutdown() + } + } +} + diff --git a/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala b/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala index c649424122..95f4a434a8 100644 --- a/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala +++ b/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala @@ -8,7 +8,6 @@ import akka.camel.TestSupport.SharedCamelSystem import internal.DefaultCamel import org.scalatest.matchers.MustMatchers import org.scalatest.mock.MockitoSugar -import akka.actor.ActorSystem import org.apache.camel.{ CamelContext, ProducerTemplate } import org.scalatest.WordSpec import akka.event.LoggingAdapter @@ -16,12 +15,14 @@ import akka.actor.ActorSystem.Settings import com.typesafe.config.ConfigFactory import org.apache.camel.impl.DefaultCamelContext import org.apache.camel.spi.Registry +import akka.actor.{ ExtendedActorSystem, ActorSystem } class DefaultCamelTest extends WordSpec with SharedCamelSystem with MustMatchers with MockitoSugar { import org.mockito.Mockito.{ when, verify } - val sys = mock[ActorSystem] + val sys = mock[ExtendedActorSystem] val config = ConfigFactory.defaultReference() + when(sys.dynamicAccess) thenReturn system.asInstanceOf[ExtendedActorSystem].dynamicAccess when(sys.settings) thenReturn (new Settings(this.getClass.getClassLoader, config, "mocksystem")) when(sys.name) thenReturn ("mocksystem") diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala index 3a7cca93e2..258d34b4b8 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala @@ -13,7 +13,6 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.concurrent.util.duration._ import concurrent.util.{ FiniteDuration, Duration } import java.lang.String -import akka.actor.{ ActorRef, Props, ActorSystem, Actor } import akka.camel._ import internal.{ DefaultCamel, CamelExchangeAdapter } import org.scalatest.{ Suite, WordSpec, BeforeAndAfterAll, BeforeAndAfterEach } @@ -29,6 +28,7 @@ import akka.testkit.{ TimingTest, TestKit, TestProbe } import org.apache.camel.impl.DefaultCamelContext import concurrent.{ Await, Promise, Future } import akka.util.Timeout +import akka.actor._ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with ActorProducerFixture { implicit val timeout = Timeout(10 seconds) @@ -313,9 +313,10 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo probe = TestProbe() - val sys = mock[ActorSystem] + val sys = mock[ExtendedActorSystem] val config = ConfigFactory.defaultReference() when(sys.dispatcher) thenReturn system.dispatcher + when(sys.dynamicAccess) thenReturn system.asInstanceOf[ExtendedActorSystem].dynamicAccess when(sys.settings) thenReturn (new Settings(this.getClass.getClassLoader, config, "mocksystem")) when(sys.name) thenReturn ("mocksystem") @@ -336,7 +337,7 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo } } } - """)) + """).withFallback(config), sys.dynamicAccess) } camel = camelWithMocks diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/DurationConverterTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/DurationConverterTest.scala index e8918a5b67..307b0d71d7 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/component/DurationConverterTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/component/DurationConverterTest.scala @@ -11,7 +11,6 @@ import scala.concurrent.util.duration._ import scala.concurrent.util.Duration import org.scalatest.WordSpec import org.apache.camel.TypeConversionException -import language.postfixOps class DurationConverterSpec extends WordSpec with MustMatchers { import DurationTypeConverter._