#2568 - Adding the ability to configure body conversions in the config file.

This commit is contained in:
Viktor Klang 2012-09-26 17:12:30 +02:00
parent 616f8be730
commit cd0c96ff9c
10 changed files with 97 additions and 56 deletions

View file

@ -25,5 +25,9 @@ akka {
# The duration of time to await activation of an endpoint. # The duration of time to await activation of an endpoint.
activation-timeout = 10s activation-timeout = 10s
} }
#Scheme to FQCN mappings for CamelMessage body conversions
conversions {
"file" = "java.io.InputStream"
}
} }
} }

View file

@ -4,8 +4,8 @@
package akka.camel package akka.camel
import internal.component.CamelPath
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.camel.internal.component.CamelPath
import org.apache.camel.model.ProcessorDefinition import org.apache.camel.model.ProcessorDefinition
import scala.concurrent.util.Duration import scala.concurrent.util.Duration

View file

@ -8,10 +8,12 @@ import internal._
import akka.actor._ import akka.actor._
import org.apache.camel.ProducerTemplate import org.apache.camel.ProducerTemplate
import org.apache.camel.impl.DefaultCamelContext import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.model.RouteDefinition
import com.typesafe.config.Config import com.typesafe.config.Config
import scala.concurrent.util.Duration import scala.concurrent.util.Duration
import java.util.concurrent.TimeUnit._
import scala.concurrent.util.FiniteDuration import scala.concurrent.util.FiniteDuration
import java.util.concurrent.TimeUnit._
import akka.ConfigurationException
/** /**
* Camel trait encapsulates the underlying camel machinery. * Camel trait encapsulates the underlying camel machinery.
@ -55,7 +57,7 @@ trait Camel extends Extension with Activation {
* Settings for the Camel Extension * Settings for the Camel Extension
* @param config the config * @param config the config
*/ */
class CamelSettings private[camel] (config: Config) { class CamelSettings private[camel] (config: Config, dynamicAccess: DynamicAccess) {
/** /**
* Configured setting for how long the actor should wait for activation before it fails. * Configured setting for how long the actor should wait for activation before it fails.
*/ */
@ -85,8 +87,23 @@ class CamelSettings private[camel] (config: Config) {
*/ */
final val streamingCache: Boolean = config.getBoolean("akka.camel.streamingCache") final val streamingCache: Boolean = config.getBoolean("akka.camel.streamingCache")
} final val conversions: (String, RouteDefinition) RouteDefinition = {
import scala.collection.JavaConverters.asScalaSetConverter
val specifiedConversions = {
val section = config.getConfig("akka.camel.conversions")
section.entrySet.asScala.map(e (e.getKey, section.getString(e.getKey)))
}
val conversions = (Map[String, Class[_ <: AnyRef]]() /: specifiedConversions) {
case (m, (key, fqcn))
m.updated(key, dynamicAccess.getClassFor[AnyRef](fqcn).recover {
case e throw new ConfigurationException("Could not find/load Camel Converter class [" + fqcn + "]", e)
}.get)
}
(s: String, r: RouteDefinition) conversions.get(s).fold(r)(r.convertBodyTo)
}
}
/** /**
* This class can be used to get hold of an instance of the Camel class bound to the actor system. * This class can be used to get hold of an instance of the Camel class bound to the actor system.
* <p>For example: * <p>For example:

View file

@ -179,7 +179,7 @@ private[camel] class ConsumerRegistrar(activationTracker: ActorRef) extends Acto
case Register(consumer, endpointUri, Some(consumerConfig)) case Register(consumer, endpointUri, Some(consumerConfig))
try { try {
// if this throws, the supervisor stops the consumer and de-registers it on termination // if this throws, the supervisor stops the consumer and de-registers it on termination
camelContext.addRoutes(new ConsumerActorRouteBuilder(endpointUri, consumer, consumerConfig)) camelContext.addRoutes(new ConsumerActorRouteBuilder(endpointUri, consumer, consumerConfig, camel.settings))
activationTracker ! EndpointActivated(consumer) activationTracker ! EndpointActivated(consumer)
} catch { } catch {
case NonFatal(e) throw new ActorActivationException(consumer, e) case NonFatal(e) throw new ActorActivationException(consumer, e)

View file

@ -4,15 +4,11 @@
package akka.camel.internal package akka.camel.internal
import akka.camel._
import component.CamelPath
import java.io.InputStream
import org.apache.camel.builder.RouteBuilder
import akka.actor._ import akka.actor._
import akka.camel._
import akka.camel.internal.component.CamelPath
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.model.RouteDefinition import org.apache.camel.model.RouteDefinition
import akka.serialization.Serializer
/** /**
* For internal use only. * For internal use only.
@ -22,18 +18,15 @@ import akka.serialization.Serializer
* *
* @author Martin Krasser * @author Martin Krasser
*/ */
private[camel] class ConsumerActorRouteBuilder(endpointUri: String, consumer: ActorRef, config: ConsumerConfig) extends RouteBuilder { private[camel] class ConsumerActorRouteBuilder(endpointUri: String, consumer: ActorRef, config: ConsumerConfig, settings: CamelSettings) extends RouteBuilder {
protected def targetActorUri = CamelPath.toUri(consumer, config.autoAck, config.replyTimeout) protected def targetActorUri = CamelPath.toUri(consumer, config.autoAck, config.replyTimeout)
def configure() { def configure(): Unit =
val scheme = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..." applyUserRouteCustomization(
val route = from(endpointUri).routeId(consumer.path.toString) settings.conversions.apply(
val converted = Conversions(scheme, route) endpointUri take endpointUri.indexOf(":"), // e.g. "http" from "http://whatever/..."
val userCustomized = applyUserRouteCustomization(converted) from(endpointUri).routeId(consumer.path.toString))).to(targetActorUri)
userCustomized.to(targetActorUri)
}
def applyUserRouteCustomization(rd: RouteDefinition) = config.onRouteDefinition(rd) def applyUserRouteCustomization(rd: RouteDefinition) = config.onRouteDefinition(rd)
} }

View file

@ -1,6 +1,5 @@
package akka.camel.internal package akka.camel.internal
import akka.actor.{ ActorRef, Props, ActorSystem }
import akka.camel.internal.component.{ DurationTypeConverter, ActorComponent } import akka.camel.internal.component.{ DurationTypeConverter, ActorComponent }
import org.apache.camel.impl.DefaultCamelContext import org.apache.camel.impl.DefaultCamelContext
import scala.Predef._ import scala.Predef._
@ -16,6 +15,7 @@ import akka.util.Timeout
import akka.pattern.ask import akka.pattern.ask
import java.io.InputStream import java.io.InputStream
import org.apache.camel.model.RouteDefinition import org.apache.camel.model.RouteDefinition
import akka.actor.{ ExtendedActorSystem, ActorRef, Props, ActorSystem }
/** /**
* For internal use only. * For internal use only.
@ -26,7 +26,7 @@ import org.apache.camel.model.RouteDefinition
* In the typical scenario, when camel is used with akka extension, it is natural that camel reuses the actor system it extends. * In the typical scenario, when camel is used with akka extension, it is natural that camel reuses the actor system it extends.
* Also by not creating extra internal actor system we are conserving resources. * Also by not creating extra internal actor system we are conserving resources.
*/ */
private[camel] class DefaultCamel(val system: ActorSystem) extends Camel { private[camel] class DefaultCamel(val system: ExtendedActorSystem) extends Camel {
val supervisor = system.actorOf(Props[CamelSupervisor], "camel-supervisor") val supervisor = system.actorOf(Props[CamelSupervisor], "camel-supervisor")
/** /**
* For internal use only. * For internal use only.
@ -43,7 +43,7 @@ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel {
ctx ctx
} }
val settings = new CamelSettings(system.settings.config) val settings = new CamelSettings(system.settings.config, system.dynamicAccess)
lazy val template: ProducerTemplate = context.createProducerTemplate() lazy val template: ProducerTemplate = context.createProducerTemplate()
@ -100,18 +100,3 @@ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel {
case EndpointFailedToDeActivate(`endpoint`, cause) throw cause case EndpointFailedToDeActivate(`endpoint`, cause) throw cause
}) })
} }
/**
* For internal use only.
*/
private[camel] object Conversions {
//FIXME Add this to the configuration, and move this functionality to the Camel Extension.
private val bodyConversions = Map(
"file" -> classOf[InputStream])
def apply(scheme: String, routeDefinition: RouteDefinition): RouteDefinition = bodyConversions.get(scheme) match {
case Some(clazz) routeDefinition.convertBodyTo(clazz)
case None routeDefinition
}
}

View file

@ -0,0 +1,41 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.camel
import org.scalatest.matchers.MustMatchers
import org.scalatest.WordSpec
import akka.actor.ActorSystem
import scala.concurrent.util.Duration
import java.util.concurrent.TimeUnit._
class CamelConfigSpec extends WordSpec with MustMatchers {
"CamelConfigSpec" must {
"have correct config" in {
val system = ActorSystem("CamelConfigSpec")
try {
val settings = CamelExtension(system).settings
val config = system.settings.config
settings.activationTimeout must be === Duration(config.getMilliseconds("akka.camel.consumer.activation-timeout"), MILLISECONDS)
settings.autoAck must be === config.getBoolean("akka.camel.consumer.auto-ack")
settings.replyTimeout must be === Duration(config.getMilliseconds("akka.camel.consumer.reply-timeout"), MILLISECONDS)
settings.streamingCache must be === config.getBoolean("akka.camel.streamingCache")
settings.jmxStatistics must be === config.getBoolean("akka.camel.jmx")
val conversions = config.getConfig("akka.camel.conversions")
conversions.getString("file") must be === "java.io.InputStream"
conversions.entrySet.size must be === 1
} finally system.shutdown()
}
}
}

View file

@ -8,7 +8,6 @@ import akka.camel.TestSupport.SharedCamelSystem
import internal.DefaultCamel import internal.DefaultCamel
import org.scalatest.matchers.MustMatchers import org.scalatest.matchers.MustMatchers
import org.scalatest.mock.MockitoSugar import org.scalatest.mock.MockitoSugar
import akka.actor.ActorSystem
import org.apache.camel.{ CamelContext, ProducerTemplate } import org.apache.camel.{ CamelContext, ProducerTemplate }
import org.scalatest.WordSpec import org.scalatest.WordSpec
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
@ -16,12 +15,14 @@ import akka.actor.ActorSystem.Settings
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.apache.camel.impl.DefaultCamelContext import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.spi.Registry import org.apache.camel.spi.Registry
import akka.actor.{ ExtendedActorSystem, ActorSystem }
class DefaultCamelTest extends WordSpec with SharedCamelSystem with MustMatchers with MockitoSugar { class DefaultCamelTest extends WordSpec with SharedCamelSystem with MustMatchers with MockitoSugar {
import org.mockito.Mockito.{ when, verify } import org.mockito.Mockito.{ when, verify }
val sys = mock[ActorSystem] val sys = mock[ExtendedActorSystem]
val config = ConfigFactory.defaultReference() val config = ConfigFactory.defaultReference()
when(sys.dynamicAccess) thenReturn system.asInstanceOf[ExtendedActorSystem].dynamicAccess
when(sys.settings) thenReturn (new Settings(this.getClass.getClassLoader, config, "mocksystem")) when(sys.settings) thenReturn (new Settings(this.getClass.getClassLoader, config, "mocksystem"))
when(sys.name) thenReturn ("mocksystem") when(sys.name) thenReturn ("mocksystem")

View file

@ -13,7 +13,6 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.util.duration._ import scala.concurrent.util.duration._
import concurrent.util.{ FiniteDuration, Duration } import concurrent.util.{ FiniteDuration, Duration }
import java.lang.String import java.lang.String
import akka.actor.{ ActorRef, Props, ActorSystem, Actor }
import akka.camel._ import akka.camel._
import internal.{ DefaultCamel, CamelExchangeAdapter } import internal.{ DefaultCamel, CamelExchangeAdapter }
import org.scalatest.{ Suite, WordSpec, BeforeAndAfterAll, BeforeAndAfterEach } import org.scalatest.{ Suite, WordSpec, BeforeAndAfterAll, BeforeAndAfterEach }
@ -29,6 +28,7 @@ import akka.testkit.{ TimingTest, TestKit, TestProbe }
import org.apache.camel.impl.DefaultCamelContext import org.apache.camel.impl.DefaultCamelContext
import concurrent.{ Await, Promise, Future } import concurrent.{ Await, Promise, Future }
import akka.util.Timeout import akka.util.Timeout
import akka.actor._
class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with ActorProducerFixture { class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with ActorProducerFixture {
implicit val timeout = Timeout(10 seconds) implicit val timeout = Timeout(10 seconds)
@ -313,9 +313,10 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo
probe = TestProbe() probe = TestProbe()
val sys = mock[ActorSystem] val sys = mock[ExtendedActorSystem]
val config = ConfigFactory.defaultReference() val config = ConfigFactory.defaultReference()
when(sys.dispatcher) thenReturn system.dispatcher when(sys.dispatcher) thenReturn system.dispatcher
when(sys.dynamicAccess) thenReturn system.asInstanceOf[ExtendedActorSystem].dynamicAccess
when(sys.settings) thenReturn (new Settings(this.getClass.getClassLoader, config, "mocksystem")) when(sys.settings) thenReturn (new Settings(this.getClass.getClassLoader, config, "mocksystem"))
when(sys.name) thenReturn ("mocksystem") when(sys.name) thenReturn ("mocksystem")
@ -336,7 +337,7 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo
} }
} }
} }
""")) """).withFallback(config), sys.dynamicAccess)
} }
camel = camelWithMocks camel = camelWithMocks

View file

@ -11,7 +11,6 @@ import scala.concurrent.util.duration._
import scala.concurrent.util.Duration import scala.concurrent.util.Duration
import org.scalatest.WordSpec import org.scalatest.WordSpec
import org.apache.camel.TypeConversionException import org.apache.camel.TypeConversionException
import language.postfixOps
class DurationConverterSpec extends WordSpec with MustMatchers { class DurationConverterSpec extends WordSpec with MustMatchers {
import DurationTypeConverter._ import DurationTypeConverter._