diff --git a/akka-camel/src/main/scala/Message.scala b/akka-camel/src/main/scala/Message.scala new file mode 100644 index 0000000000..88f810e045 --- /dev/null +++ b/akka-camel/src/main/scala/Message.scala @@ -0,0 +1,61 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.camel + +import org.apache.camel.{Message => CamelMessage} +import org.apache.camel.impl.DefaultCamelContext + +import scala.collection.jcl.{Map => MapWrapper} + +/** + * @author Martin Krasser + */ +class Message(val body: Any, val headers: Map[String, Any]) { + + def this(body: Any) = this(body, Map.empty) + + def bodyAs[T](clazz: Class[T]): T = Message.converter.mandatoryConvertTo[T](clazz, body) + +} + +/** + * @author Martin Krasser + */ +object Message { + + val converter = new DefaultCamelContext().getTypeConverter + + def apply(body: Any) = new Message(body) + + def apply(body: Any, headers: Map[String, Any]) = new Message(body, headers) + + def apply(cm: CamelMessage) = + new Message(cm.getBody, Map.empty ++ MapWrapper[String, AnyRef](cm.getHeaders).elements) + +} + +/** + * @author Martin Krasser + */ +class CamelMessageWrapper(val cm: CamelMessage) { + + def from(m: Message): CamelMessage = { + cm.setBody(m.body) + for (h <- m.headers) { + cm.getHeaders.put(h._1, h._2.asInstanceOf[AnyRef]) + } + cm + } + +} + +/** + * @author Martin Krasser + */ +object CamelMessageWrapper { + + implicit def wrapCamelMessage(cm: CamelMessage): CamelMessageWrapper = new CamelMessageWrapper(cm) + +} \ No newline at end of file diff --git a/akka-camel/src/main/scala/component/ActorComponent.scala b/akka-camel/src/main/scala/component/ActorComponent.scala index b85cf4829f..d59a4261c9 100644 --- a/akka-camel/src/main/scala/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/component/ActorComponent.scala @@ -11,6 +11,7 @@ import org.apache.camel.{Exchange, Consumer, Processor} import org.apache.camel.impl.{DefaultProducer, DefaultEndpoint, DefaultComponent} import se.scalablesolutions.akka.actor.{ActorRegistry, Actor} +import se.scalablesolutions.akka.camel.{CamelMessageWrapper, Message} /** * Camel component for interacting with actors. @@ -104,25 +105,26 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) { } protected def processInOnly(exchange: Exchange, actor: Actor) { - actor ! exchange.getIn + actor ! Message(exchange.getIn) } protected def processInOut(exchange: Exchange, actor: Actor) { - val outmsg = exchange.getOut + + import CamelMessageWrapper._ + // TODO: make timeout configurable - // TODO: send immutable message // TODO: support asynchronous communication // - jetty component: jetty continuations // - file component: completion callbacks - val result: Any = actor !! exchange.getIn + val result: Any = actor !! Message(exchange.getIn) result match { - case Some((body, headers:Map[String, Any])) => { - outmsg.setBody(body) - for (header <- headers) - outmsg.getHeaders.put(header._1, header._2.asInstanceOf[AnyRef]) + case Some(m:Message) => { + exchange.getOut.from(m) + } + case Some(body) => { + exchange.getOut.setBody(body) } - case Some(body) => outmsg.setBody(body) } } diff --git a/akka-camel/src/test/scala/MessageTest.scala b/akka-camel/src/test/scala/MessageTest.scala new file mode 100644 index 0000000000..791f243ee4 --- /dev/null +++ b/akka-camel/src/test/scala/MessageTest.scala @@ -0,0 +1,24 @@ +package se.scalablesolutions.akka.camel.service + +import java.io.InputStream + +import org.apache.camel.NoTypeConversionAvailableException +import org.junit.Assert._ +import org.junit.Test +import org.scalatest.junit.JUnitSuite + +import se.scalablesolutions.akka.camel.Message + +class MessageTest extends JUnitSuite { + + @Test def shouldConvertDoubleBodyToString = { + assertEquals("1.4", new Message(1.4, null).bodyAs(classOf[String])) + } + + @Test def shouldThrowExceptionWhenConvertingDoubleBodyToInputStream { + intercept[NoTypeConversionAvailableException] { + new Message(1.4, null).bodyAs(classOf[InputStream]) + } + } + +} \ No newline at end of file diff --git a/akka-camel/src/test/scala/component/ActorComponentTest.scala b/akka-camel/src/test/scala/component/ActorComponentTest.scala index 15a8aa523a..30ea7d1a5b 100644 --- a/akka-camel/src/test/scala/component/ActorComponentTest.scala +++ b/akka-camel/src/test/scala/component/ActorComponentTest.scala @@ -1,11 +1,12 @@ package se.scalablesolutions.akka.camel.component -import org.apache.camel.Message -import org.apache.camel.impl.{SimpleRegistry, DefaultCamelContext} import org.junit._ import org.junit.Assert._ import org.scalatest.junit.JUnitSuite import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.camel.Message +import org.apache.camel.{CamelContext, ExchangePattern} +import org.apache.camel.impl.{DefaultExchange, SimpleRegistry, DefaultCamelContext} /** * @author Martin Krasser @@ -25,25 +26,27 @@ class ActorComponentTest extends JUnitSuite { context.stop } - @Test def shouldCommunicateWithActorReferencedById = { - val actor = new ActorComponentTestActor + @Test def shouldReceiveResponseFromActorReferencedById = { + val actor = new TestActor actor.start assertEquals("Hello Martin", template.requestBody("actor:%s" format actor.getId, "Martin")) assertEquals("Hello Martin", template.requestBody("actor:id:%s" format actor.getId, "Martin")) actor.stop } - @Test def shouldCommunicateWithActorReferencedByUuid = { - val actor = new ActorComponentTestActor + @Test def shouldReceiveResponseFromActorReferencedByUuid = { + val actor = new TestActor actor.start assertEquals("Hello Martin", template.requestBody("actor:uuid:%s" format actor.uuid, "Martin")) actor.stop } + class TestActor extends Actor { + protected def receive = { + case msg: Message => reply("Hello %s" format msg.body) + } + } + } -class ActorComponentTestActor extends Actor { - protected def receive = { - case msg: Message => reply("Hello %s" format msg.getBody) - } -} \ No newline at end of file + diff --git a/akka-camel/src/test/scala/component/ActorProducerTest.scala b/akka-camel/src/test/scala/component/ActorProducerTest.scala new file mode 100644 index 0000000000..73e51ebb04 --- /dev/null +++ b/akka-camel/src/test/scala/component/ActorProducerTest.scala @@ -0,0 +1,40 @@ +package se.scalablesolutions.akka.camel.component + +import org.apache.camel.{CamelContext, ExchangePattern} +import org.junit.Assert._ +import org.junit.Test +import org.scalatest.junit.JUnitSuite + +import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.camel.Message +import org.apache.camel.impl.{DefaultCamelContext, DefaultExchange} + +/** + * @author Martin Krasser + */ +class ActorProducerTest extends JUnitSuite { + + val context = new DefaultCamelContext + val endpoint = context.getEndpoint("actor:%s" format classOf[TestActor].getName) + val producer = endpoint.createProducer + + @Test def shouldSendAndReceiveMessageBodyAndHeaders = { + val exchange = new DefaultExchange(null.asInstanceOf[CamelContext], ExchangePattern.InOut) + val actor = new TestActor + actor.start + exchange.getIn.setBody("Martin") + exchange.getIn.setHeader("k1", "v1") + producer.process(exchange) + assertEquals("Hello Martin", exchange.getOut.getBody) + assertEquals("v1", exchange.getOut.getHeader("k1")) + assertEquals("v2", exchange.getOut.getHeader("k2")) + actor.stop + } + + class TestActor extends Actor { + protected def receive = { + case msg: Message => reply(Message("Hello %s" format msg.body, Map("k2" -> "v2") ++ msg.headers)) + } + } + +} diff --git a/akka-camel/src/test/scala/service/CamelServiceTest.scala b/akka-camel/src/test/scala/service/CamelServiceTest.scala index 1e00413b14..52f6d1fd04 100644 --- a/akka-camel/src/test/scala/service/CamelServiceTest.scala +++ b/akka-camel/src/test/scala/service/CamelServiceTest.scala @@ -1,15 +1,14 @@ package se.scalablesolutions.akka.camel.service -import org.apache.camel.Message import org.apache.camel.builder.RouteBuilder import org.apache.camel.impl.DefaultCamelContext import org.junit.Assert._ import org.junit.{Before, After, Test} import org.scalatest.junit.JUnitSuite -import se.scalablesolutions.akka.annotation.consume -import se.scalablesolutions.akka.camel.Consumer import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.annotation.consume +import se.scalablesolutions.akka.camel.{Message, Consumer} /** * @author Martin Krasser @@ -51,17 +50,17 @@ class CamelServiceTest extends JUnitSuite { service.onUnload } - @Test def shouldCommunicateWithAutoDetectedActor1ViaGeneratedRoute = { + @Test def shouldReceiveResponseFromActor1ViaGeneratedRoute = { val result = template.requestBody("direct:actor1", "Martin") assertEquals("Hello Martin (actor1)", result) } - @Test def shouldCommunicateWithAutoDetectedActor2ViaGeneratedRoute = { + @Test def shouldReceiveResponseFromActor2ViaGeneratedRoute = { val result = template.requestBody("direct:actor2", "Martin") assertEquals("Hello Martin (actor2)", result) } - @Test def shouldCommunicateWithAutoDetectedActor3ViaCustomRoute = { + @Test def shouldReceiveResponseFromActor3ViaCustomRoute = { val result = template.requestBody("direct:actor3", "Martin") assertEquals("Hello Tester (actor3)", result) } @@ -72,14 +71,15 @@ class TestActor1 extends Actor with Consumer { def endpointUri = "direct:actor1" protected def receive = { - case msg: Message => reply("Hello %s (actor1)" format msg.getBody) + case msg: Message => reply("Hello %s (actor1)" format msg.body) } + } @consume("direct:actor2") class TestActor2 extends Actor { protected def receive = { - case msg: Message => reply("Hello %s (actor2)" format msg.getBody) + case msg: Message => reply("Hello %s (actor2)" format msg.body) } } @@ -87,7 +87,7 @@ class TestActor3 extends Actor { id = "actor3" protected def receive = { - case msg: Message => reply("Hello %s (actor3)" format msg.getBody) + case msg: Message => reply("Hello %s (actor3)" format msg.body) } } diff --git a/akka-samples/akka-sample-camel/src/main/scala/Consumer1.scala b/akka-samples/akka-sample-camel/src/main/scala/Consumer1.scala index 4e07866393..b292d6e186 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Consumer1.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Consumer1.scala @@ -1,10 +1,8 @@ package sample.camel -import org.apache.camel.Message - import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.actor.Actor -import se.scalablesolutions.akka.camel.Consumer +import se.scalablesolutions.akka.camel.{Message, Consumer} /** * @author Martin Krasser @@ -14,7 +12,7 @@ class Consumer1 extends Actor with Consumer with Logging { def endpointUri = "file:data/input" def receive = { - case msg: Message => log.info("received %s" format msg.getBody(classOf[String])) + case msg: Message => log.info("received %s" format msg.bodyAs(classOf[String])) } } \ No newline at end of file diff --git a/akka-samples/akka-sample-camel/src/main/scala/Consumer2.scala b/akka-samples/akka-sample-camel/src/main/scala/Consumer2.scala index aa9cd5e612..4940c46f0d 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Consumer2.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Consumer2.scala @@ -1,9 +1,8 @@ package sample.camel -import org.apache.camel.Message - import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.annotation.consume +import se.scalablesolutions.akka.camel.Message /** * @author Martin Krasser @@ -12,7 +11,7 @@ import se.scalablesolutions.akka.annotation.consume class Consumer2 extends Actor { def receive = { - case msg: Message => reply("Hello %s" format msg.getBody(classOf[String])) + case msg: Message => reply("Hello %s" format msg.bodyAs(classOf[String])) } } \ No newline at end of file