use immutable messages for communication with actors

This commit is contained in:
Martin Krasser 2010-03-01 13:35:11 +01:00
parent 15b9787ca0
commit 3b62a7a658
8 changed files with 163 additions and 36 deletions

View file

@ -0,0 +1,61 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel
import org.apache.camel.{Message => CamelMessage}
import org.apache.camel.impl.DefaultCamelContext
import scala.collection.jcl.{Map => MapWrapper}
/**
* @author Martin Krasser
*/
class Message(val body: Any, val headers: Map[String, Any]) {
def this(body: Any) = this(body, Map.empty)
def bodyAs[T](clazz: Class[T]): T = Message.converter.mandatoryConvertTo[T](clazz, body)
}
/**
* @author Martin Krasser
*/
object Message {
val converter = new DefaultCamelContext().getTypeConverter
def apply(body: Any) = new Message(body)
def apply(body: Any, headers: Map[String, Any]) = new Message(body, headers)
def apply(cm: CamelMessage) =
new Message(cm.getBody, Map.empty ++ MapWrapper[String, AnyRef](cm.getHeaders).elements)
}
/**
* @author Martin Krasser
*/
class CamelMessageWrapper(val cm: CamelMessage) {
def from(m: Message): CamelMessage = {
cm.setBody(m.body)
for (h <- m.headers) {
cm.getHeaders.put(h._1, h._2.asInstanceOf[AnyRef])
}
cm
}
}
/**
* @author Martin Krasser
*/
object CamelMessageWrapper {
implicit def wrapCamelMessage(cm: CamelMessage): CamelMessageWrapper = new CamelMessageWrapper(cm)
}

View file

@ -11,6 +11,7 @@ import org.apache.camel.{Exchange, Consumer, Processor}
import org.apache.camel.impl.{DefaultProducer, DefaultEndpoint, DefaultComponent}
import se.scalablesolutions.akka.actor.{ActorRegistry, Actor}
import se.scalablesolutions.akka.camel.{CamelMessageWrapper, Message}
/**
* Camel component for interacting with actors.
@ -104,25 +105,26 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
}
protected def processInOnly(exchange: Exchange, actor: Actor) {
actor ! exchange.getIn
actor ! Message(exchange.getIn)
}
protected def processInOut(exchange: Exchange, actor: Actor) {
val outmsg = exchange.getOut
import CamelMessageWrapper._
// TODO: make timeout configurable
// TODO: send immutable message
// TODO: support asynchronous communication
// - jetty component: jetty continuations
// - file component: completion callbacks
val result: Any = actor !! exchange.getIn
val result: Any = actor !! Message(exchange.getIn)
result match {
case Some((body, headers:Map[String, Any])) => {
outmsg.setBody(body)
for (header <- headers)
outmsg.getHeaders.put(header._1, header._2.asInstanceOf[AnyRef])
case Some(m:Message) => {
exchange.getOut.from(m)
}
case Some(body) => {
exchange.getOut.setBody(body)
}
case Some(body) => outmsg.setBody(body)
}
}

View file

@ -0,0 +1,24 @@
package se.scalablesolutions.akka.camel.service
import java.io.InputStream
import org.apache.camel.NoTypeConversionAvailableException
import org.junit.Assert._
import org.junit.Test
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.camel.Message
class MessageTest extends JUnitSuite {
@Test def shouldConvertDoubleBodyToString = {
assertEquals("1.4", new Message(1.4, null).bodyAs(classOf[String]))
}
@Test def shouldThrowExceptionWhenConvertingDoubleBodyToInputStream {
intercept[NoTypeConversionAvailableException] {
new Message(1.4, null).bodyAs(classOf[InputStream])
}
}
}

View file

@ -1,11 +1,12 @@
package se.scalablesolutions.akka.camel.component
import org.apache.camel.Message
import org.apache.camel.impl.{SimpleRegistry, DefaultCamelContext}
import org.junit._
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.camel.Message
import org.apache.camel.{CamelContext, ExchangePattern}
import org.apache.camel.impl.{DefaultExchange, SimpleRegistry, DefaultCamelContext}
/**
* @author Martin Krasser
@ -25,25 +26,27 @@ class ActorComponentTest extends JUnitSuite {
context.stop
}
@Test def shouldCommunicateWithActorReferencedById = {
val actor = new ActorComponentTestActor
@Test def shouldReceiveResponseFromActorReferencedById = {
val actor = new TestActor
actor.start
assertEquals("Hello Martin", template.requestBody("actor:%s" format actor.getId, "Martin"))
assertEquals("Hello Martin", template.requestBody("actor:id:%s" format actor.getId, "Martin"))
actor.stop
}
@Test def shouldCommunicateWithActorReferencedByUuid = {
val actor = new ActorComponentTestActor
@Test def shouldReceiveResponseFromActorReferencedByUuid = {
val actor = new TestActor
actor.start
assertEquals("Hello Martin", template.requestBody("actor:uuid:%s" format actor.uuid, "Martin"))
actor.stop
}
class TestActor extends Actor {
protected def receive = {
case msg: Message => reply("Hello %s" format msg.body)
}
}
}
class ActorComponentTestActor extends Actor {
protected def receive = {
case msg: Message => reply("Hello %s" format msg.getBody)
}
}

View file

@ -0,0 +1,40 @@
package se.scalablesolutions.akka.camel.component
import org.apache.camel.{CamelContext, ExchangePattern}
import org.junit.Assert._
import org.junit.Test
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.camel.Message
import org.apache.camel.impl.{DefaultCamelContext, DefaultExchange}
/**
* @author Martin Krasser
*/
class ActorProducerTest extends JUnitSuite {
val context = new DefaultCamelContext
val endpoint = context.getEndpoint("actor:%s" format classOf[TestActor].getName)
val producer = endpoint.createProducer
@Test def shouldSendAndReceiveMessageBodyAndHeaders = {
val exchange = new DefaultExchange(null.asInstanceOf[CamelContext], ExchangePattern.InOut)
val actor = new TestActor
actor.start
exchange.getIn.setBody("Martin")
exchange.getIn.setHeader("k1", "v1")
producer.process(exchange)
assertEquals("Hello Martin", exchange.getOut.getBody)
assertEquals("v1", exchange.getOut.getHeader("k1"))
assertEquals("v2", exchange.getOut.getHeader("k2"))
actor.stop
}
class TestActor extends Actor {
protected def receive = {
case msg: Message => reply(Message("Hello %s" format msg.body, Map("k2" -> "v2") ++ msg.headers))
}
}
}

View file

@ -1,15 +1,14 @@
package se.scalablesolutions.akka.camel.service
import org.apache.camel.Message
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.impl.DefaultCamelContext
import org.junit.Assert._
import org.junit.{Before, After, Test}
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.annotation.consume
import se.scalablesolutions.akka.camel.Consumer
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.annotation.consume
import se.scalablesolutions.akka.camel.{Message, Consumer}
/**
* @author Martin Krasser
@ -51,17 +50,17 @@ class CamelServiceTest extends JUnitSuite {
service.onUnload
}
@Test def shouldCommunicateWithAutoDetectedActor1ViaGeneratedRoute = {
@Test def shouldReceiveResponseFromActor1ViaGeneratedRoute = {
val result = template.requestBody("direct:actor1", "Martin")
assertEquals("Hello Martin (actor1)", result)
}
@Test def shouldCommunicateWithAutoDetectedActor2ViaGeneratedRoute = {
@Test def shouldReceiveResponseFromActor2ViaGeneratedRoute = {
val result = template.requestBody("direct:actor2", "Martin")
assertEquals("Hello Martin (actor2)", result)
}
@Test def shouldCommunicateWithAutoDetectedActor3ViaCustomRoute = {
@Test def shouldReceiveResponseFromActor3ViaCustomRoute = {
val result = template.requestBody("direct:actor3", "Martin")
assertEquals("Hello Tester (actor3)", result)
}
@ -72,14 +71,15 @@ class TestActor1 extends Actor with Consumer {
def endpointUri = "direct:actor1"
protected def receive = {
case msg: Message => reply("Hello %s (actor1)" format msg.getBody)
case msg: Message => reply("Hello %s (actor1)" format msg.body)
}
}
@consume("direct:actor2")
class TestActor2 extends Actor {
protected def receive = {
case msg: Message => reply("Hello %s (actor2)" format msg.getBody)
case msg: Message => reply("Hello %s (actor2)" format msg.body)
}
}
@ -87,7 +87,7 @@ class TestActor3 extends Actor {
id = "actor3"
protected def receive = {
case msg: Message => reply("Hello %s (actor3)" format msg.getBody)
case msg: Message => reply("Hello %s (actor3)" format msg.body)
}
}