Closes #392 Support untyped Java actors as endpoint producer

This commit is contained in:
Martin Krasser 2010-08-14 11:31:15 +02:00
parent 4b7ce4e3e3
commit 979cd371e6
8 changed files with 218 additions and 19 deletions

View file

@ -30,14 +30,16 @@ trait Consumer { self: Actor =>
* Java-friendly {@link Consumer} inherited by
*
* <ul>
* <li>{@link UntypedConsumer}</li>
* <li>{@link RemoteUntypedConsumer}</li>
* <li>{@link TransactedUntypedConsumer}</li>
* <li>{@link UntypedConsumerActor}</li>
* <li>{@link RemoteUntypedConsumerActor}</li>
* <li>{@link UntypedConsumerTransactor}</li>
* </ul>
*
* implementations.
*
* @author Martin Krasser
*/
trait JavaConsumer extends Consumer { self: Actor =>
trait UntypedConsumer extends Consumer { self: UntypedActor =>
final override def endpointUri = getEndpointUri
@ -60,19 +62,19 @@ trait JavaConsumer extends Consumer { self: Actor =>
* Subclass this abstract class to create an MDB-style untyped consumer actor. This
* class is meant to be used from Java.
*/
abstract class UntypedConsumer extends UntypedActor with JavaConsumer
abstract class UntypedConsumerActor extends UntypedActor with UntypedConsumer
/**
* Subclass this abstract class to create an MDB-style transacted untyped consumer
* actor. This class is meant to be used from Java.
*/
abstract class TransactedUntypedConsumer extends UntypedTransactor with JavaConsumer
abstract class UntypedConsumerTransactor extends UntypedTransactor with UntypedConsumer
/**
* Subclass this abstract class to create an MDB-style remote untyped consumer
* actor. This class is meant to be used from Java.
*/
abstract class RemoteUntypedConsumer(address: InetSocketAddress) extends RemoteUntypedActor(address) with JavaConsumer {
abstract class RemoteUntypedConsumerActor(address: InetSocketAddress) extends RemoteUntypedActor(address) with UntypedConsumer {
def this(host: String, port: Int) = this(new InetSocketAddress(host, port))
}

View file

@ -9,14 +9,14 @@ import CamelMessageConversion.toExchangeAdapter
import org.apache.camel._
import org.apache.camel.processor.SendProcessor
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.actor.{Actor, ActorRef, UntypedActor}
/**
* Mixed in by Actor implementations that produce messages to Camel endpoints.
* Support trait for producing messages to Camel endpoints.
*
* @author Martin Krasser
*/
trait Producer { this: Actor =>
trait ProducerSupport { this: Actor =>
/**
* Message headers to copy by default from request message to response-message.
@ -141,11 +141,6 @@ trait Producer { this: Actor =>
case msg => if (!oneway) self.reply(msg)
}
/**
* Default implementation of Actor.receive
*/
protected def receive = produce
/**
* Creates a new Exchange with given <code>pattern</code> from the endpoint specified by
* <code>endpointUri</code>.
@ -162,6 +157,78 @@ trait Producer { this: Actor =>
}
}
/**
* Mixed in by Actor implementations that produce messages to Camel endpoints.
*/
trait Producer extends ProducerSupport { this: Actor =>
/**
* Default implementation of Actor.receive
*/
protected def receive = produce
}
/**
* Java-friendly {@link ProducerSupport} inherited by {@link UntypedProducerActor} implementations.
*
* @author Martin Krasser
*/
trait UntypedProducer extends ProducerSupport { this: UntypedActor =>
final override def endpointUri = getEndpointUri
final override def oneway = isOneway
final override def receiveBeforeProduce = {
case msg => onReceiveBeforeProduce(msg)
}
final override def receiveAfterProduce = {
case msg => onReceiveAfterProduce(msg)
}
/**
* Default implementation of UntypedActor.onReceive
*/
def onReceive(message: Any) = produce(message)
/**
* Returns the Camel endpoint URI to produce messages to.
*/
def getEndpointUri(): String
/**
* If set to false (default), this producer expects a response message from the Camel endpoint.
* If set to true, this producer communicates with the Camel endpoint with an in-only message
* exchange pattern (fire and forget).
*/
def isOneway() = super.oneway
/**
* Called before the message is sent to the endpoint specified by <code>getEndpointUri</code>. The original
* message is passed as argument. By default, this method simply returns the argument but may be overridden
* by subclasses.
*/
@throws(classOf[Exception])
def onReceiveBeforeProduce(message: Any): Any = super.receiveBeforeProduce(message)
/**
* Called after the a result was received from the endpoint specified by <code>getEndpointUri</code>. The
* result is passed as argument. By default, this method replies the result back to the original sender
* if <code>isOneway</code> returns false. If <code>isOneway</code> returns true then nothing is done. This
* method may be overridden by subclasses.
*/
@throws(classOf[Exception])
def onReceiveAfterProduce(message: Any): Unit = super.receiveAfterProduce(message)
}
/**
* Subclass this abstract class to create an untyped producer actor. This class is meant to be used from Java.
*
* @author Martin Krasser
*/
abstract class UntypedProducerActor extends UntypedActor with UntypedProducer
/**
* @author Martin Krasser
*/

View file

@ -1,11 +1,11 @@
package se.scalablesolutions.akka.camel;
import java.net.InetSocketAddress;
import se.scalablesolutions.akka.camel.RemoteUntypedConsumerActor;
/**
* @author Martin Krasser
*/
public class SampleRemoteUntypedConsumer extends RemoteUntypedConsumer {
public class SampleRemoteUntypedConsumer extends RemoteUntypedConsumerActor {
public SampleRemoteUntypedConsumer() {
this("localhost", 7774);

View file

@ -1,9 +1,11 @@
package se.scalablesolutions.akka.camel;
import se.scalablesolutions.akka.camel.UntypedConsumerActor;
/**
* @author Martin Krasser
*/
public class SampleUntypedConsumer extends UntypedConsumer {
public class SampleUntypedConsumer extends UntypedConsumerActor {
public String getEndpointUri() {
return "direct:test-untyped-consumer";

View file

@ -3,7 +3,7 @@ package se.scalablesolutions.akka.camel;
/**
* @author Martin Krasser
*/
public class SampleUntypedConsumerBlocking extends UntypedConsumer {
public class SampleUntypedConsumerBlocking extends UntypedConsumerActor {
public String getEndpointUri() {
return "direct:test-untyped-consumer-blocking";

View file

@ -0,0 +1,18 @@
package se.scalablesolutions.akka.camel;
/**
* @author Martin Krasser
*/
public class SampleUntypedForwardingProducer extends UntypedProducerActor {
public String getEndpointUri() {
return "direct:producer-test-1";
}
@Override
public void onReceiveAfterProduce(Object message) {
Message msg = (Message)message;
String body = msg.bodyAs(String.class);
CamelContextManager.template().sendBody("direct:forward-test-1", body);
}
}

View file

@ -0,0 +1,12 @@
package se.scalablesolutions.akka.camel;
/**
* @author Martin Krasser
*/
public class SampleUntypedReplyingProducer extends UntypedProducerActor {
public String getEndpointUri() {
return "direct:producer-test-1";
}
}

View file

@ -0,0 +1,98 @@
package se.scalablesolutions.akka.camel
import org.apache.camel.{Exchange, Processor}
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.component.mock.MockEndpoint
import org.scalatest.{GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec}
import se.scalablesolutions.akka.actor.UntypedActor._
import se.scalablesolutions.akka.actor.ActorRegistry
class UntypedProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach with GivenWhenThen {
import UntypedProducerFeatureTest._
override protected def beforeAll = {
ActorRegistry.shutdownAll
CamelContextManager.init
CamelContextManager.context.addRoutes(new TestRoute)
CamelContextManager.start
}
override protected def afterAll = {
CamelContextManager.stop
ActorRegistry.shutdownAll
}
override protected def afterEach = {
mockEndpoint.reset
}
feature("Produce a message to a sync Camel route") {
scenario("produce message and receive normal response") {
given("a registered two-way producer")
val producer = actorOf(classOf[SampleUntypedReplyingProducer])
producer.start
when("a test message is sent to the producer with !!")
val message = Message("test", Map(Message.MessageExchangeId -> "123"))
val result = producer.sendRequestReply(message)
then("a normal response should have been returned by the producer")
val expected = Message("received test", Map(Message.MessageExchangeId -> "123"))
assert(result === expected)
}
scenario("produce message and receive failure response") {
given("a registered two-way producer")
val producer = actorOf(classOf[SampleUntypedReplyingProducer])
producer.start
when("a test message causing an exception is sent to the producer with !!")
val message = Message("fail", Map(Message.MessageExchangeId -> "123"))
val result = producer.sendRequestReply(message).asInstanceOf[Failure]
then("a failure response should have been returned by the producer")
val expectedFailureText = result.cause.getMessage
val expectedHeaders = result.headers
assert(expectedFailureText === "failure")
assert(expectedHeaders === Map(Message.MessageExchangeId -> "123"))
}
}
feature("Produce a message to a sync Camel route and then forward the response") {
scenario("produce message and send normal response to direct:forward-test-1") {
given("a registered one-way producer configured with a forward target")
val producer = actorOf(classOf[SampleUntypedForwardingProducer])
producer.start
when("a test message is sent to the producer with !")
mockEndpoint.expectedBodiesReceived("received test")
val result = producer.sendOneWay(Message("test"), producer)
then("a normal response should have been sent")
mockEndpoint.assertIsSatisfied
}
}
private def mockEndpoint = CamelContextManager.context.getEndpoint("mock:mock", classOf[MockEndpoint])
}
object UntypedProducerFeatureTest {
class TestRoute extends RouteBuilder {
def configure {
from("direct:forward-test-1").to("mock:mock")
from("direct:producer-test-1").process(new Processor() {
def process(exchange: Exchange) = {
exchange.getIn.getBody match {
case "fail" => throw new Exception("failure")
case body => exchange.getOut.setBody("received %s" format body)
}
}
})
}
}
}