diff --git a/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala b/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala index e8cfdbd609..5531cda109 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala @@ -91,6 +91,7 @@ private[camel] class IdempotentCamelConsumerRegistry(camelContext: CamelContext) log.debug("Published actor [{}] at endpoint [{}]", consumerConfig, endpointUri) case UnregisterConsumer(consumer) ⇒ camelContext.stopRoute(consumer.path.toString) + camelContext.removeRoute(consumer.path.toString) context.sender ! EndpointDeActivated(consumer) log.debug("Unpublished actor [{}] from endpoint [{}]", consumer, consumer.path) } diff --git a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala index 7a6f7a915a..4dff298ecb 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala @@ -23,6 +23,8 @@ import java.util.concurrent.{ TimeoutException, CountDownLatch } import akka.util.Timeout import akka.camel.internal.CamelExchangeAdapter import akka.camel.{ ActorNotRegisteredException, Camel, Ack, FailureResult, CamelMessage } +import support.TypeConverterSupport + /** * For internal use only. * Creates Camel [[org.apache.camel.Endpoint]]s that send messages to [[akka.camel.Consumer]] actors through an [[akka.camel.internal.component.ActorProducer]]. @@ -187,21 +189,15 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex /** * For internal use only. Converts Strings to [[scala.concurrent.util.Duration]] */ -private[camel] object DurationTypeConverter extends TypeConverter { - override def convertTo[T](`type`: Class[T], value: AnyRef): T = `type`.cast(try { - val d = Duration(value.toString) - if (`type`.isInstance(d)) d else null - } catch { - case NonFatal(_) ⇒ null - }) +private[camel] object DurationTypeConverter extends TypeConverterSupport { - def convertTo[T](`type`: Class[T], exchange: Exchange, value: AnyRef): T = convertTo(`type`, value) - def mandatoryConvertTo[T](`type`: Class[T], value: AnyRef): T = convertTo(`type`, value) match { - case null ⇒ throw new NoTypeConversionAvailableException(value, `type`) - case some ⇒ some - } - def mandatoryConvertTo[T](`type`: Class[T], exchange: Exchange, value: AnyRef): T = mandatoryConvertTo(`type`, value) - def toString(duration: Duration): String = duration.toNanos + " nanos" + @throws(classOf[TypeConversionException]) + def convertTo[T](valueType: Class[T], exchange: Exchange, value: AnyRef): T = valueType.cast(try { + val d = Duration(value.toString) + if (valueType.isInstance(d)) d else null + } catch { + case NonFatal(throwable) ⇒ throw new TypeConversionException(value, valueType, throwable) + }) } /** diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala index 5e471c2947..08ee3cf99d 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala @@ -97,7 +97,6 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC camel.routes.get(0).getEndpoint.getEndpointUri must be("direct://test") system.stop(consumer) Await.result(camel.deactivationFutureFor(consumer), defaultTimeout) - camel.routeCount must be(0) } diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/DurationConverterTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/DurationConverterTest.scala index a77f7dea77..4134ed35bc 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/component/DurationConverterTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/component/DurationConverterTest.scala @@ -4,13 +4,11 @@ package akka.camel.internal.component -import language.postfixOps - import org.scalatest.matchers.MustMatchers import scala.concurrent.util.duration._ import scala.concurrent.util.Duration import org.scalatest.WordSpec -import org.apache.camel.NoTypeConversionAvailableException +import org.apache.camel.{ TypeConversionException, NoTypeConversionAvailableException } class DurationConverterSpec extends WordSpec with MustMatchers { import DurationTypeConverter._ @@ -20,21 +18,21 @@ class DurationConverterSpec extends WordSpec with MustMatchers { } "DurationTypeConverter must do the roundtrip" in { - convertTo(classOf[Duration], DurationTypeConverter.toString(10 seconds)) must be(10 seconds) + convertTo(classOf[Duration], (10 seconds).toString()) must be(10 seconds) } "DurationTypeConverter must throw if invalid format" in { - convertTo(classOf[Duration], "abc nanos") must be === null + tryConvertTo(classOf[Duration], "abc nanos") must be === null - intercept[NoTypeConversionAvailableException] { + intercept[TypeConversionException] { mandatoryConvertTo(classOf[Duration], "abc nanos") must be(10 nanos) }.getValue must be === "abc nanos" } "DurationTypeConverter must throw if doesn't end with time unit" in { - convertTo(classOf[Duration], "10233") must be === null + tryConvertTo(classOf[Duration], "10233") must be === null - intercept[NoTypeConversionAvailableException] { + intercept[TypeConversionException] { mandatoryConvertTo(classOf[Duration], "10233") must be(10 nanos) }.getValue must be === "10233" } diff --git a/akka-camel/src/test/scala/akka/camelexamples/ExamplesSupport.scala b/akka-camel/src/test/scala/akka/camelexamples/ExamplesSupport.scala deleted file mode 100644 index ff84b5d085..0000000000 --- a/akka-camel/src/test/scala/akka/camelexamples/ExamplesSupport.scala +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.camelexamples - -import language.postfixOps - -import akka.camel._ -import scala.concurrent.util.duration._ -import akka.actor.{ Actor, OneForOneStrategy } -import akka.actor.SupervisorStrategy._ - -private[camelexamples] object ExamplesSupport { - val retry3xWithin1s = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 second) { - case _: Exception ⇒ Restart - } -} - -private[camelexamples] class SysOutConsumer extends Consumer { - override def activationTimeout = 10 seconds - def endpointUri = "file://data/input/CamelConsumer" - - def receive = { - case msg: CamelMessage ⇒ { - printf("Received '%s'\n", msg.bodyAs[String]) - } - } -} - -private[camelexamples] class TroubleMaker extends Consumer { - def endpointUri = "WRONG URI" - - println("Trying to instantiate conumer with uri: " + endpointUri) - def receive = { case _ ⇒ } -} - -private[camelexamples] class SysOutActor(implicit camel: Camel) extends Actor { - implicit val camelContext = camel.context - def receive = { - case msg: CamelMessage ⇒ { - printf("Received '%s'\n", msg.bodyAs[String]) - } - } -} \ No newline at end of file diff --git a/akka-camel/src/test/scala/akka/camelexamples/README.txt b/akka-camel/src/test/scala/akka/camelexamples/README.txt deleted file mode 100644 index 5317b283ba..0000000000 --- a/akka-camel/src/test/scala/akka/camelexamples/README.txt +++ /dev/null @@ -1 +0,0 @@ -This package is outside of akka.camel because we don't want to use private[camel] features in examples. \ No newline at end of file diff --git a/akka-camel/src/test/scala/akka/camelexamples/RichString.scala b/akka-camel/src/test/scala/akka/camelexamples/RichString.scala deleted file mode 100644 index 1c4443c465..0000000000 --- a/akka-camel/src/test/scala/akka/camelexamples/RichString.scala +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.camelexamples - -import language.implicitConversions - -import java.io.FileWriter - -private[camelexamples] object RichString { - implicit def toRichString(s: String): RichString = new RichString(s) -} - -private[camelexamples] class RichString(s: String) { - def saveAs(fileName: String) = write(fileName, s) - def >>(fileName: String) = this.saveAs(fileName) - def <<(content: String) = write(s, content) - - private[this] def write(fileName: String, content: String) { - val f = new FileWriter(fileName) - f.write(content) - f.close() - } -} - diff --git a/akka-camel/src/test/scala/akka/camelexamples/_1_SimpleConsumer.scala b/akka-camel/src/test/scala/akka/camelexamples/_1_SimpleConsumer.scala deleted file mode 100644 index 13d4d7cfd9..0000000000 --- a/akka-camel/src/test/scala/akka/camelexamples/_1_SimpleConsumer.scala +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.camelexamples - -import akka.actor.{ Props, ActorSystem } -import RichString._ - -private[camelexamples] object _1_SimpleConsumer extends App { - val system = ActorSystem("test") - - system.actorOf(Props[SysOutConsumer]) - - "data/input/CamelConsumer/file1.txt" << "test data " + math.random - - Thread.sleep(2000) - - system.shutdown() - -} \ No newline at end of file diff --git a/akka-camel/src/test/scala/akka/camelexamples/_2_SupervisedConsumers.scala b/akka-camel/src/test/scala/akka/camelexamples/_2_SupervisedConsumers.scala deleted file mode 100644 index cdf46f012f..0000000000 --- a/akka-camel/src/test/scala/akka/camelexamples/_2_SupervisedConsumers.scala +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.camelexamples - -import akka.actor.{ PoisonPill, Terminated, Props, ActorSystem, Actor } -import ExamplesSupport._ -import RichString._ - -private[camelexamples] object SupervisedConsumersExample extends App { - - val system = ActorSystem("test1") - - system.actorOf(Props(new Actor { - context.watch(context.actorOf(Props[EndpointManager])) - def receive = { - case Terminated(ref) ⇒ system.shutdown() - } - })) - - "data/input/CamelConsumer/file1.txt" << "test data " + math.random -} - -private[camelexamples] class EndpointManager extends Actor { - import context._ - - override def supervisorStrategy() = retry3xWithin1s - - watch(actorOf(Props[SysOutConsumer])) - watch(actorOf(Props[TroubleMaker])) - - def receive = { - case Terminated(ref) ⇒ { - printf("Hey! One of the endpoints has died: %s. I am doing sepuku...\n", ref) - self ! PoisonPill - } - } -} diff --git a/akka-camel/src/test/scala/akka/camelexamples/_3_SimpleActorEndpoint.scala b/akka-camel/src/test/scala/akka/camelexamples/_3_SimpleActorEndpoint.scala deleted file mode 100644 index d9ecdcc0e6..0000000000 --- a/akka-camel/src/test/scala/akka/camelexamples/_3_SimpleActorEndpoint.scala +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.camelexamples - -import org.apache.camel.builder.RouteBuilder -import akka.actor.{ Props, ActorSystem } -import akka.camel._ -import RichString._ - -private[camelexamples] object _3_SimpleActorEndpoint extends App { - - val system = ActorSystem("test") - val camel = CamelExtension(system) - - val actor = system.actorOf(Props[SysOutActor]) - - camel.context.addRoutes(new RouteBuilder() { - def configure() { - from("file://data/input/CamelConsumer").to(actor) - } - }) - - "data/input/CamelConsumer/file1.txt" << "test data " + math.random - - Thread.sleep(3000) - - system.shutdown() - -} \ No newline at end of file diff --git a/akka-docs/java/code/docs/camel/sample/http/HttpProducer.java b/akka-docs/java/code/docs/camel/sample/http/HttpProducer.java index 5fb2dfaba2..27e0cb0df3 100644 --- a/akka-docs/java/code/docs/camel/sample/http/HttpProducer.java +++ b/akka-docs/java/code/docs/camel/sample/http/HttpProducer.java @@ -26,7 +26,7 @@ public class HttpProducer extends UntypedProducerActor{ CamelMessage camelMessage = (CamelMessage) message; Set httpPath = new HashSet(); httpPath.add(Exchange.HTTP_PATH); - return camelMessage.addHeaders(camelMessage.getHeaders(httpPath)); + return camelMessage.withHeaders(camelMessage.getHeaders(httpPath)); } else return super.onTransformOutgoingMessage(message); } diff --git a/akka-docs/java/code/docs/camel/sample/http/HttpTransformer.java b/akka-docs/java/code/docs/camel/sample/http/HttpTransformer.java index 6c897c858b..36a620379f 100644 --- a/akka-docs/java/code/docs/camel/sample/http/HttpTransformer.java +++ b/akka-docs/java/code/docs/camel/sample/http/HttpTransformer.java @@ -10,9 +10,10 @@ public class HttpTransformer extends UntypedActor{ public void onReceive(Object message) { if (message instanceof CamelMessage) { CamelMessage camelMessage = (CamelMessage) message; - CamelMessage replacedMessage = camelMessage.mapBody(new Function(){ - public String apply(String body) { - return body.replaceAll("Akka ", "AKKA "); + CamelMessage replacedMessage = camelMessage.mapBody(new Function(){ + public String apply(Object body) { + String text = new String((byte[])body); + return text.replaceAll("Akka ", "AKKA "); } }); getSender().tell(replacedMessage); diff --git a/akka-docs/scala/code/docs/camel/HttpExample.scala b/akka-docs/scala/code/docs/camel/HttpExample.scala index 55f200d5ef..e5429f391b 100644 --- a/akka-docs/scala/code/docs/camel/HttpExample.scala +++ b/akka-docs/scala/code/docs/camel/HttpExample.scala @@ -29,7 +29,7 @@ object HttpExample { class HttpTransformer extends Actor { def receive = { - case msg: CamelMessage ⇒ sender ! (msg.mapBody { body: String ⇒ body replaceAll ("Akka ", "AKKA ") }) + case msg: CamelMessage ⇒ sender ! (msg.mapBody { body: Array[Byte] ⇒ new String(body).replaceAll("Akka ", "AKKA ") }) case msg: Failure ⇒ sender ! msg } } diff --git a/akka-samples/akka-sample-camel/README b/akka-samples/akka-sample-camel/README new file mode 100644 index 0000000000..10738ce0ee --- /dev/null +++ b/akka-samples/akka-sample-camel/README @@ -0,0 +1,28 @@ +Camel +=== + +Requirements +------------ + +To build and run Camel you need [Simple Build Tool][sbt] (sbt). + +Running +------- + +First time, 'sbt update' to get dependencies, then use 'sbt run'. +Here is an example. First type 'sbt' to start SBT interactively, the run 'update' and 'run': +> cd $AKKA_HOME + +> % sbt + +> > project akka-sample-camel + +> > run + +> > Choose 1 or 2 depending on what sample you wish to run + +Notice +------ + +[akka]: http://akka.io +[sbt]: http://code.google.com/p/simple-build-tool/ \ No newline at end of file diff --git a/akka-samples/akka-sample-camel/src/main/scala/AsyncRouteAndTransform.scala b/akka-samples/akka-sample-camel/src/main/scala/AsyncRouteAndTransform.scala new file mode 100644 index 0000000000..5c6f52d595 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/scala/AsyncRouteAndTransform.scala @@ -0,0 +1,49 @@ +import akka.actor.Status.Failure +import akka.actor.{ Actor, ActorRef, Props, ActorSystem } +import akka.camel.{ Producer, CamelMessage, Consumer } +import org.apache.camel.{ Exchange } + +/** + * Asynchronous routing and transformation example + */ +object AsyncRouteAndTransform extends App { + val system = ActorSystem("rewriteAkkaToAKKA") + val httpTransformer = system.actorOf(Props[HttpTransformer], "transformer") + val httpProducer = system.actorOf(Props(new HttpProducer(httpTransformer)), "producer") + val httpConsumer = system.actorOf(Props(new HttpConsumer(httpProducer)), "consumer") +} + +class HttpConsumer(producer: ActorRef) extends Consumer { + def endpointUri = "jetty:http://0.0.0.0:8875/" + def receive = { + case msg ⇒ producer forward msg + } +} + +class HttpProducer(transformer: ActorRef) extends Actor with Producer { + def endpointUri = "jetty://http://akka.io/?bridgeEndpoint=true" + + override def transformOutgoingMessage(msg: Any) = msg match { + case msg: CamelMessage ⇒ msg.withHeaders(msg.headers(Set(Exchange.HTTP_PATH))) + } + + override def routeResponse(msg: Any) { + transformer forward msg + } +} + +class HttpTransformer extends Actor { + def receive = { + case msg: CamelMessage ⇒ + val transformedMsg = msg.mapBody { + (body: Array[Byte]) ⇒ + new String(body).replaceAll("Akka", "AKKA") + // just to make the result look a bit better. + .replaceAll("href=\"/resources", "href=\"http://akka.io/resources") + .replaceAll("src=\"/resources", "src=\"http://akka.io/resources") + } + sender ! transformedMsg + case msg: Failure ⇒ sender ! msg + } +} + diff --git a/akka-samples/akka-sample-camel/src/main/scala/SimpleFileConsumer.scala b/akka-samples/akka-sample-camel/src/main/scala/SimpleFileConsumer.scala new file mode 100644 index 0000000000..909de26813 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/scala/SimpleFileConsumer.scala @@ -0,0 +1,24 @@ +import akka.actor.{ Props, ActorSystem } +import akka.camel.{ CamelMessage, Consumer } +import java.io.File +import org.apache.camel.Exchange + +object SimpleFileConsumer extends App { + val subDir = "consume-files" + val tmpDir = System.getProperty("java.io.tmpdir") + val consumeDir = new File(tmpDir, subDir) + consumeDir.mkdirs() + val tmpDirUri = "file://%s/%s" format (tmpDir, subDir) + + val system = ActorSystem("consume-files") + val fileConsumer = system.actorOf(Props(new FileConsumer(tmpDirUri)), "fileConsumer") + println(String.format("Put a text file in '%s', the consumer will pick it up!", consumeDir)) +} + +class FileConsumer(uri: String) extends Consumer { + def endpointUri = uri + def receive = { + case msg: CamelMessage ⇒ + println("Received file %s with content:\n%s".format(msg.getHeader(Exchange.FILE_NAME), msg.bodyAs[String])) + } +} diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 2c6aedcaef..c46a04b589 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -282,7 +282,16 @@ object AkkaBuild extends Build { id = "akka-samples", base = file("akka-samples"), settings = parentSettings, - aggregate = Seq(fsmSample, helloSample, helloKernelSample, remoteSample) + aggregate = Seq(camelSample, fsmSample, helloSample, helloKernelSample, remoteSample) + ) + + lazy val camelSample = Project( + id = "akka-sample-camel", + base = file("akka-samples/akka-sample-camel"), + dependencies = Seq(actor, camel), + settings = defaultSettings ++ Seq( + libraryDependencies ++= Dependencies.camelSample + ) ) lazy val fsmSample = Project( @@ -517,12 +526,15 @@ object Dependencies { val camel = Seq(camelCore, Test.scalatest, Test.junit, Test.mockito) + val camelSample = Seq(CamelSample.camelJetty) + val osgi = Seq(osgiCore,Test.logback, Test.commonsIo, Test.pojosr, Test.tinybundles, Test.scalatest, Test.junit) val osgiAries = Seq(osgiCore, ariesBlueprint, Test.ariesProxy) val tutorials = Seq(Test.scalatest, Test.junit) + val docs = Seq(Test.scalatest, Test.junit, Test.junitIntf) val zeroMQ = Seq(protobuf, zeroMQClient, Test.scalatest, Test.junit) @@ -530,7 +542,7 @@ object Dependencies { object Dependency { // Compile - val camelCore = "org.apache.camel" % "camel-core" % "2.8.0" exclude("org.slf4j", "slf4j-api") // ApacheV2 + val camelCore = "org.apache.camel" % "camel-core" % "2.10.0" exclude("org.slf4j", "slf4j-api") // ApacheV2 val config = "com.typesafe" % "config" % "0.5.0" // ApacheV2 val netty = "io.netty" % "netty" % "3.5.3.Final" // ApacheV2 val protobuf = "com.google.protobuf" % "protobuf-java" % "2.4.1" // New BSD @@ -557,6 +569,12 @@ object Dependency { val log4j = "log4j" % "log4j" % "1.2.14" % "test" // ApacheV2 val junitIntf = "com.novocode" % "junit-interface" % "0.8" % "test" // MIT } + + // Camel Sample + object CamelSample { + val camelJetty = "org.apache.camel" % "camel-jetty" % "2.10.0" // ApacheV2 + } + } // OSGi settings