Merge pull request #609 from akka/wip_ticket_2322_1928
ticket #2322 updated camel to 2.10.0
This commit is contained in:
commit
e1a6e23d7f
17 changed files with 144 additions and 193 deletions
|
|
@ -91,6 +91,7 @@ private[camel] class IdempotentCamelConsumerRegistry(camelContext: CamelContext)
|
|||
log.debug("Published actor [{}] at endpoint [{}]", consumerConfig, endpointUri)
|
||||
case UnregisterConsumer(consumer) ⇒
|
||||
camelContext.stopRoute(consumer.path.toString)
|
||||
camelContext.removeRoute(consumer.path.toString)
|
||||
context.sender ! EndpointDeActivated(consumer)
|
||||
log.debug("Unpublished actor [{}] from endpoint [{}]", consumer, consumer.path)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,6 +23,8 @@ import java.util.concurrent.{ TimeoutException, CountDownLatch }
|
|||
import akka.util.Timeout
|
||||
import akka.camel.internal.CamelExchangeAdapter
|
||||
import akka.camel.{ ActorNotRegisteredException, Camel, Ack, FailureResult, CamelMessage }
|
||||
import support.TypeConverterSupport
|
||||
|
||||
/**
|
||||
* For internal use only.
|
||||
* Creates Camel [[org.apache.camel.Endpoint]]s that send messages to [[akka.camel.Consumer]] actors through an [[akka.camel.internal.component.ActorProducer]].
|
||||
|
|
@ -187,21 +189,15 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
|
|||
/**
|
||||
* For internal use only. Converts Strings to [[scala.concurrent.util.Duration]]
|
||||
*/
|
||||
private[camel] object DurationTypeConverter extends TypeConverter {
|
||||
override def convertTo[T](`type`: Class[T], value: AnyRef): T = `type`.cast(try {
|
||||
val d = Duration(value.toString)
|
||||
if (`type`.isInstance(d)) d else null
|
||||
} catch {
|
||||
case NonFatal(_) ⇒ null
|
||||
})
|
||||
private[camel] object DurationTypeConverter extends TypeConverterSupport {
|
||||
|
||||
def convertTo[T](`type`: Class[T], exchange: Exchange, value: AnyRef): T = convertTo(`type`, value)
|
||||
def mandatoryConvertTo[T](`type`: Class[T], value: AnyRef): T = convertTo(`type`, value) match {
|
||||
case null ⇒ throw new NoTypeConversionAvailableException(value, `type`)
|
||||
case some ⇒ some
|
||||
}
|
||||
def mandatoryConvertTo[T](`type`: Class[T], exchange: Exchange, value: AnyRef): T = mandatoryConvertTo(`type`, value)
|
||||
def toString(duration: Duration): String = duration.toNanos + " nanos"
|
||||
@throws(classOf[TypeConversionException])
|
||||
def convertTo[T](valueType: Class[T], exchange: Exchange, value: AnyRef): T = valueType.cast(try {
|
||||
val d = Duration(value.toString)
|
||||
if (valueType.isInstance(d)) d else null
|
||||
} catch {
|
||||
case NonFatal(throwable) ⇒ throw new TypeConversionException(value, valueType, throwable)
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -97,7 +97,6 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
|
|||
camel.routes.get(0).getEndpoint.getEndpointUri must be("direct://test")
|
||||
system.stop(consumer)
|
||||
Await.result(camel.deactivationFutureFor(consumer), defaultTimeout)
|
||||
|
||||
camel.routeCount must be(0)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,13 +4,11 @@
|
|||
|
||||
package akka.camel.internal.component
|
||||
|
||||
import language.postfixOps
|
||||
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import scala.concurrent.util.duration._
|
||||
import scala.concurrent.util.Duration
|
||||
import org.scalatest.WordSpec
|
||||
import org.apache.camel.NoTypeConversionAvailableException
|
||||
import org.apache.camel.{ TypeConversionException, NoTypeConversionAvailableException }
|
||||
|
||||
class DurationConverterSpec extends WordSpec with MustMatchers {
|
||||
import DurationTypeConverter._
|
||||
|
|
@ -20,21 +18,21 @@ class DurationConverterSpec extends WordSpec with MustMatchers {
|
|||
}
|
||||
|
||||
"DurationTypeConverter must do the roundtrip" in {
|
||||
convertTo(classOf[Duration], DurationTypeConverter.toString(10 seconds)) must be(10 seconds)
|
||||
convertTo(classOf[Duration], (10 seconds).toString()) must be(10 seconds)
|
||||
}
|
||||
|
||||
"DurationTypeConverter must throw if invalid format" in {
|
||||
convertTo(classOf[Duration], "abc nanos") must be === null
|
||||
tryConvertTo(classOf[Duration], "abc nanos") must be === null
|
||||
|
||||
intercept[NoTypeConversionAvailableException] {
|
||||
intercept[TypeConversionException] {
|
||||
mandatoryConvertTo(classOf[Duration], "abc nanos") must be(10 nanos)
|
||||
}.getValue must be === "abc nanos"
|
||||
}
|
||||
|
||||
"DurationTypeConverter must throw if doesn't end with time unit" in {
|
||||
convertTo(classOf[Duration], "10233") must be === null
|
||||
tryConvertTo(classOf[Duration], "10233") must be === null
|
||||
|
||||
intercept[NoTypeConversionAvailableException] {
|
||||
intercept[TypeConversionException] {
|
||||
mandatoryConvertTo(classOf[Duration], "10233") must be(10 nanos)
|
||||
}.getValue must be === "10233"
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,45 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.camelexamples
|
||||
|
||||
import language.postfixOps
|
||||
|
||||
import akka.camel._
|
||||
import scala.concurrent.util.duration._
|
||||
import akka.actor.{ Actor, OneForOneStrategy }
|
||||
import akka.actor.SupervisorStrategy._
|
||||
|
||||
private[camelexamples] object ExamplesSupport {
|
||||
val retry3xWithin1s = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 second) {
|
||||
case _: Exception ⇒ Restart
|
||||
}
|
||||
}
|
||||
|
||||
private[camelexamples] class SysOutConsumer extends Consumer {
|
||||
override def activationTimeout = 10 seconds
|
||||
def endpointUri = "file://data/input/CamelConsumer"
|
||||
|
||||
def receive = {
|
||||
case msg: CamelMessage ⇒ {
|
||||
printf("Received '%s'\n", msg.bodyAs[String])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private[camelexamples] class TroubleMaker extends Consumer {
|
||||
def endpointUri = "WRONG URI"
|
||||
|
||||
println("Trying to instantiate conumer with uri: " + endpointUri)
|
||||
def receive = { case _ ⇒ }
|
||||
}
|
||||
|
||||
private[camelexamples] class SysOutActor(implicit camel: Camel) extends Actor {
|
||||
implicit val camelContext = camel.context
|
||||
def receive = {
|
||||
case msg: CamelMessage ⇒ {
|
||||
printf("Received '%s'\n", msg.bodyAs[String])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1 +0,0 @@
|
|||
This package is outside of akka.camel because we don't want to use private[camel] features in examples.
|
||||
|
|
@ -1,26 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.camelexamples
|
||||
|
||||
import language.implicitConversions
|
||||
|
||||
import java.io.FileWriter
|
||||
|
||||
private[camelexamples] object RichString {
|
||||
implicit def toRichString(s: String): RichString = new RichString(s)
|
||||
}
|
||||
|
||||
private[camelexamples] class RichString(s: String) {
|
||||
def saveAs(fileName: String) = write(fileName, s)
|
||||
def >>(fileName: String) = this.saveAs(fileName)
|
||||
def <<(content: String) = write(s, content)
|
||||
|
||||
private[this] def write(fileName: String, content: String) {
|
||||
val f = new FileWriter(fileName)
|
||||
f.write(content)
|
||||
f.close()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1,21 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.camelexamples
|
||||
|
||||
import akka.actor.{ Props, ActorSystem }
|
||||
import RichString._
|
||||
|
||||
private[camelexamples] object _1_SimpleConsumer extends App {
|
||||
val system = ActorSystem("test")
|
||||
|
||||
system.actorOf(Props[SysOutConsumer])
|
||||
|
||||
"data/input/CamelConsumer/file1.txt" << "test data " + math.random
|
||||
|
||||
Thread.sleep(2000)
|
||||
|
||||
system.shutdown()
|
||||
|
||||
}
|
||||
|
|
@ -1,39 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.camelexamples
|
||||
|
||||
import akka.actor.{ PoisonPill, Terminated, Props, ActorSystem, Actor }
|
||||
import ExamplesSupport._
|
||||
import RichString._
|
||||
|
||||
private[camelexamples] object SupervisedConsumersExample extends App {
|
||||
|
||||
val system = ActorSystem("test1")
|
||||
|
||||
system.actorOf(Props(new Actor {
|
||||
context.watch(context.actorOf(Props[EndpointManager]))
|
||||
def receive = {
|
||||
case Terminated(ref) ⇒ system.shutdown()
|
||||
}
|
||||
}))
|
||||
|
||||
"data/input/CamelConsumer/file1.txt" << "test data " + math.random
|
||||
}
|
||||
|
||||
private[camelexamples] class EndpointManager extends Actor {
|
||||
import context._
|
||||
|
||||
override def supervisorStrategy() = retry3xWithin1s
|
||||
|
||||
watch(actorOf(Props[SysOutConsumer]))
|
||||
watch(actorOf(Props[TroubleMaker]))
|
||||
|
||||
def receive = {
|
||||
case Terminated(ref) ⇒ {
|
||||
printf("Hey! One of the endpoints has died: %s. I am doing sepuku...\n", ref)
|
||||
self ! PoisonPill
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,31 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.camelexamples
|
||||
|
||||
import org.apache.camel.builder.RouteBuilder
|
||||
import akka.actor.{ Props, ActorSystem }
|
||||
import akka.camel._
|
||||
import RichString._
|
||||
|
||||
private[camelexamples] object _3_SimpleActorEndpoint extends App {
|
||||
|
||||
val system = ActorSystem("test")
|
||||
val camel = CamelExtension(system)
|
||||
|
||||
val actor = system.actorOf(Props[SysOutActor])
|
||||
|
||||
camel.context.addRoutes(new RouteBuilder() {
|
||||
def configure() {
|
||||
from("file://data/input/CamelConsumer").to(actor)
|
||||
}
|
||||
})
|
||||
|
||||
"data/input/CamelConsumer/file1.txt" << "test data " + math.random
|
||||
|
||||
Thread.sleep(3000)
|
||||
|
||||
system.shutdown()
|
||||
|
||||
}
|
||||
|
|
@ -26,7 +26,7 @@ public class HttpProducer extends UntypedProducerActor{
|
|||
CamelMessage camelMessage = (CamelMessage) message;
|
||||
Set<String> httpPath = new HashSet<String>();
|
||||
httpPath.add(Exchange.HTTP_PATH);
|
||||
return camelMessage.addHeaders(camelMessage.getHeaders(httpPath));
|
||||
return camelMessage.withHeaders(camelMessage.getHeaders(httpPath));
|
||||
} else return super.onTransformOutgoingMessage(message);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -10,9 +10,10 @@ public class HttpTransformer extends UntypedActor{
|
|||
public void onReceive(Object message) {
|
||||
if (message instanceof CamelMessage) {
|
||||
CamelMessage camelMessage = (CamelMessage) message;
|
||||
CamelMessage replacedMessage = camelMessage.mapBody(new Function<String, String>(){
|
||||
public String apply(String body) {
|
||||
return body.replaceAll("Akka ", "AKKA ");
|
||||
CamelMessage replacedMessage = camelMessage.mapBody(new Function<Object, String>(){
|
||||
public String apply(Object body) {
|
||||
String text = new String((byte[])body);
|
||||
return text.replaceAll("Akka ", "AKKA ");
|
||||
}
|
||||
});
|
||||
getSender().tell(replacedMessage);
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ object HttpExample {
|
|||
|
||||
class HttpTransformer extends Actor {
|
||||
def receive = {
|
||||
case msg: CamelMessage ⇒ sender ! (msg.mapBody { body: String ⇒ body replaceAll ("Akka ", "AKKA ") })
|
||||
case msg: CamelMessage ⇒ sender ! (msg.mapBody { body: Array[Byte] ⇒ new String(body).replaceAll("Akka ", "AKKA ") })
|
||||
case msg: Failure ⇒ sender ! msg
|
||||
}
|
||||
}
|
||||
|
|
|
|||
28
akka-samples/akka-sample-camel/README
Normal file
28
akka-samples/akka-sample-camel/README
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
Camel
|
||||
===
|
||||
|
||||
Requirements
|
||||
------------
|
||||
|
||||
To build and run Camel you need [Simple Build Tool][sbt] (sbt).
|
||||
|
||||
Running
|
||||
-------
|
||||
|
||||
First time, 'sbt update' to get dependencies, then use 'sbt run'.
|
||||
Here is an example. First type 'sbt' to start SBT interactively, the run 'update' and 'run':
|
||||
> cd $AKKA_HOME
|
||||
|
||||
> % sbt
|
||||
|
||||
> > project akka-sample-camel
|
||||
|
||||
> > run
|
||||
|
||||
> > Choose 1 or 2 depending on what sample you wish to run
|
||||
|
||||
Notice
|
||||
------
|
||||
|
||||
[akka]: http://akka.io
|
||||
[sbt]: http://code.google.com/p/simple-build-tool/
|
||||
|
|
@ -0,0 +1,49 @@
|
|||
import akka.actor.Status.Failure
|
||||
import akka.actor.{ Actor, ActorRef, Props, ActorSystem }
|
||||
import akka.camel.{ Producer, CamelMessage, Consumer }
|
||||
import org.apache.camel.{ Exchange }
|
||||
|
||||
/**
|
||||
* Asynchronous routing and transformation example
|
||||
*/
|
||||
object AsyncRouteAndTransform extends App {
|
||||
val system = ActorSystem("rewriteAkkaToAKKA")
|
||||
val httpTransformer = system.actorOf(Props[HttpTransformer], "transformer")
|
||||
val httpProducer = system.actorOf(Props(new HttpProducer(httpTransformer)), "producer")
|
||||
val httpConsumer = system.actorOf(Props(new HttpConsumer(httpProducer)), "consumer")
|
||||
}
|
||||
|
||||
class HttpConsumer(producer: ActorRef) extends Consumer {
|
||||
def endpointUri = "jetty:http://0.0.0.0:8875/"
|
||||
def receive = {
|
||||
case msg ⇒ producer forward msg
|
||||
}
|
||||
}
|
||||
|
||||
class HttpProducer(transformer: ActorRef) extends Actor with Producer {
|
||||
def endpointUri = "jetty://http://akka.io/?bridgeEndpoint=true"
|
||||
|
||||
override def transformOutgoingMessage(msg: Any) = msg match {
|
||||
case msg: CamelMessage ⇒ msg.withHeaders(msg.headers(Set(Exchange.HTTP_PATH)))
|
||||
}
|
||||
|
||||
override def routeResponse(msg: Any) {
|
||||
transformer forward msg
|
||||
}
|
||||
}
|
||||
|
||||
class HttpTransformer extends Actor {
|
||||
def receive = {
|
||||
case msg: CamelMessage ⇒
|
||||
val transformedMsg = msg.mapBody {
|
||||
(body: Array[Byte]) ⇒
|
||||
new String(body).replaceAll("Akka", "<b>AKKA</b>")
|
||||
// just to make the result look a bit better.
|
||||
.replaceAll("href=\"/resources", "href=\"http://akka.io/resources")
|
||||
.replaceAll("src=\"/resources", "src=\"http://akka.io/resources")
|
||||
}
|
||||
sender ! transformedMsg
|
||||
case msg: Failure ⇒ sender ! msg
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,24 @@
|
|||
import akka.actor.{ Props, ActorSystem }
|
||||
import akka.camel.{ CamelMessage, Consumer }
|
||||
import java.io.File
|
||||
import org.apache.camel.Exchange
|
||||
|
||||
object SimpleFileConsumer extends App {
|
||||
val subDir = "consume-files"
|
||||
val tmpDir = System.getProperty("java.io.tmpdir")
|
||||
val consumeDir = new File(tmpDir, subDir)
|
||||
consumeDir.mkdirs()
|
||||
val tmpDirUri = "file://%s/%s" format (tmpDir, subDir)
|
||||
|
||||
val system = ActorSystem("consume-files")
|
||||
val fileConsumer = system.actorOf(Props(new FileConsumer(tmpDirUri)), "fileConsumer")
|
||||
println(String.format("Put a text file in '%s', the consumer will pick it up!", consumeDir))
|
||||
}
|
||||
|
||||
class FileConsumer(uri: String) extends Consumer {
|
||||
def endpointUri = uri
|
||||
def receive = {
|
||||
case msg: CamelMessage ⇒
|
||||
println("Received file %s with content:\n%s".format(msg.getHeader(Exchange.FILE_NAME), msg.bodyAs[String]))
|
||||
}
|
||||
}
|
||||
|
|
@ -282,7 +282,16 @@ object AkkaBuild extends Build {
|
|||
id = "akka-samples",
|
||||
base = file("akka-samples"),
|
||||
settings = parentSettings,
|
||||
aggregate = Seq(fsmSample, helloSample, helloKernelSample, remoteSample)
|
||||
aggregate = Seq(camelSample, fsmSample, helloSample, helloKernelSample, remoteSample)
|
||||
)
|
||||
|
||||
lazy val camelSample = Project(
|
||||
id = "akka-sample-camel",
|
||||
base = file("akka-samples/akka-sample-camel"),
|
||||
dependencies = Seq(actor, camel),
|
||||
settings = defaultSettings ++ Seq(
|
||||
libraryDependencies ++= Dependencies.camelSample
|
||||
)
|
||||
)
|
||||
|
||||
lazy val fsmSample = Project(
|
||||
|
|
@ -517,12 +526,15 @@ object Dependencies {
|
|||
|
||||
val camel = Seq(camelCore, Test.scalatest, Test.junit, Test.mockito)
|
||||
|
||||
val camelSample = Seq(CamelSample.camelJetty)
|
||||
|
||||
val osgi = Seq(osgiCore,Test.logback, Test.commonsIo, Test.pojosr, Test.tinybundles, Test.scalatest, Test.junit)
|
||||
|
||||
val osgiAries = Seq(osgiCore, ariesBlueprint, Test.ariesProxy)
|
||||
|
||||
val tutorials = Seq(Test.scalatest, Test.junit)
|
||||
|
||||
|
||||
val docs = Seq(Test.scalatest, Test.junit, Test.junitIntf)
|
||||
|
||||
val zeroMQ = Seq(protobuf, zeroMQClient, Test.scalatest, Test.junit)
|
||||
|
|
@ -530,7 +542,7 @@ object Dependencies {
|
|||
|
||||
object Dependency {
|
||||
// Compile
|
||||
val camelCore = "org.apache.camel" % "camel-core" % "2.8.0" exclude("org.slf4j", "slf4j-api") // ApacheV2
|
||||
val camelCore = "org.apache.camel" % "camel-core" % "2.10.0" exclude("org.slf4j", "slf4j-api") // ApacheV2
|
||||
val config = "com.typesafe" % "config" % "0.5.0" // ApacheV2
|
||||
val netty = "io.netty" % "netty" % "3.5.3.Final" // ApacheV2
|
||||
val protobuf = "com.google.protobuf" % "protobuf-java" % "2.4.1" // New BSD
|
||||
|
|
@ -557,6 +569,12 @@ object Dependency {
|
|||
val log4j = "log4j" % "log4j" % "1.2.14" % "test" // ApacheV2
|
||||
val junitIntf = "com.novocode" % "junit-interface" % "0.8" % "test" // MIT
|
||||
}
|
||||
|
||||
// Camel Sample
|
||||
object CamelSample {
|
||||
val camelJetty = "org.apache.camel" % "camel-jetty" % "2.10.0" // ApacheV2
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// OSGi settings
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue