Merge pull request #385 from akka/wip-1970-camel-producer-converters
Wip 1970 camel producer converters
This commit is contained in:
commit
b43f0119fc
4 changed files with 30 additions and 23 deletions
|
|
@ -88,13 +88,13 @@ trait ProducerSupport { this: Actor ⇒
|
|||
* @see Producer#produce(Any, ExchangePattern)
|
||||
*/
|
||||
protected def produce: Receive = {
|
||||
case res: MessageResult ⇒ receiveAfterProduce(res.message)
|
||||
case res: FailureResult ⇒ receiveAfterProduce(res.failure)
|
||||
case res: MessageResult ⇒ routeResponse(res.message)
|
||||
case res: FailureResult ⇒ routeResponse(res.failure)
|
||||
case msg ⇒ {
|
||||
if (oneway)
|
||||
produce(receiveBeforeProduce(msg), ExchangePattern.InOnly)
|
||||
produce(transformOutgoingMessage(msg), ExchangePattern.InOnly)
|
||||
else
|
||||
produce(receiveBeforeProduce(msg), ExchangePattern.InOut)
|
||||
produce(transformOutgoingMessage(msg), ExchangePattern.InOut)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -103,9 +103,14 @@ trait ProducerSupport { this: Actor ⇒
|
|||
* message is passed as argument. By default, this method simply returns the argument but may be overridden
|
||||
* by subtraits or subclasses.
|
||||
*/
|
||||
protected def receiveBeforeProduce: PartialFunction[Any, Any] = {
|
||||
case msg ⇒ msg
|
||||
}
|
||||
protected def transformOutgoingMessage(msg: Any): Any = msg
|
||||
|
||||
/**
|
||||
* Called before the response message is sent to the original sender. The original
|
||||
* message is passed as argument. By default, this method simply returns the argument but may be overridden
|
||||
* by subtraits or subclasses.
|
||||
*/
|
||||
protected def transformResponse(msg: Any): Any = msg
|
||||
|
||||
/**
|
||||
* Called after a response was received from the endpoint specified by <code>endpointUri</code>. The
|
||||
|
|
@ -114,8 +119,9 @@ trait ProducerSupport { this: Actor ⇒
|
|||
* done. This method may be overridden by subtraits or subclasses (e.g. to forward responses to another
|
||||
* actor).
|
||||
*/
|
||||
protected def receiveAfterProduce: Receive = {
|
||||
case msg ⇒ if (!oneway) sender ! msg
|
||||
|
||||
protected def routeResponse(msg: Any) {
|
||||
if (!oneway) sender ! transformResponse(msg)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,7 +19,14 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport {
|
|||
* message is passed as argument. By default, this method simply returns the argument but may be overridden
|
||||
* by subclasses.
|
||||
*/
|
||||
def onReceiveBeforeProduce(message: AnyRef): AnyRef = message
|
||||
def onTransformOutgoingMessage(message: AnyRef): AnyRef = message
|
||||
|
||||
/**
|
||||
* Called before the response message is sent to original sender. The original
|
||||
* message is passed as argument. By default, this method simply returns the argument but may be overridden
|
||||
* by subclasses.
|
||||
*/
|
||||
def onTransformResponse(message: AnyRef): AnyRef = message
|
||||
|
||||
/**
|
||||
* Called after a response was received from the endpoint specified by <code>endpointUri</code>. The
|
||||
|
|
@ -27,15 +34,11 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport {
|
|||
* if <code>oneway</code> is <code>false</code>. If <code>oneway</code> is <code>true</code>, nothing is
|
||||
* done. This method may be overridden by subclasses (e.g. to forward responses to another actor).
|
||||
*/
|
||||
def onReceiveAfterProduce(message: AnyRef): Unit = super.receiveAfterProduce(message)
|
||||
def onRouteResponse(message: AnyRef): Unit = super.routeResponse(message)
|
||||
|
||||
final override def receiveBeforeProduce = {
|
||||
case msg: AnyRef ⇒ onReceiveBeforeProduce(msg)
|
||||
}
|
||||
|
||||
final override def receiveAfterProduce = {
|
||||
case msg: AnyRef ⇒ onReceiveAfterProduce(msg)
|
||||
}
|
||||
final override def transformOutgoingMessage(msg: Any): AnyRef = onTransformOutgoingMessage(msg.asInstanceOf[AnyRef])
|
||||
final override def transformResponse(msg: Any): AnyRef = onTransformResponse(msg.asInstanceOf[AnyRef])
|
||||
final override def routeResponse(msg: Any): Any = onRouteResponse(msg.asInstanceOf[AnyRef])
|
||||
|
||||
final override def endpointUri = getEndpointUri
|
||||
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ public class SampleUntypedForwardingProducer extends UntypedProducerActor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onReceiveAfterProduce(Object message) {
|
||||
public void onRouteResponse(Object message) {
|
||||
CamelMessage msg = (CamelMessage)message;
|
||||
String body = msg.getBodyAs(String.class,getCamelContext());
|
||||
getProducerTemplate().sendBody("direct:forward-test-1", body);
|
||||
|
|
|
|||
|
|
@ -266,7 +266,7 @@ object ProducerFeatureTest {
|
|||
class TestProducer(uri: String, upper: Boolean = false) extends Actor with Producer {
|
||||
def endpointUri = uri
|
||||
|
||||
override protected def receiveBeforeProduce = {
|
||||
override protected def transformOutgoingMessage(msg: Any) = msg match {
|
||||
case msg: CamelMessage ⇒ if (upper) msg.mapBody {
|
||||
body: String ⇒ body.toUpperCase
|
||||
}
|
||||
|
|
@ -277,9 +277,7 @@ object ProducerFeatureTest {
|
|||
class TestForwarder(uri: String, target: ActorRef) extends Actor with Producer {
|
||||
def endpointUri = uri
|
||||
|
||||
override protected def receiveAfterProduce = {
|
||||
case msg ⇒ target forward msg
|
||||
}
|
||||
override protected def routeResponse(msg: Any): Any = { target forward msg }
|
||||
}
|
||||
|
||||
class TestResponder extends Actor {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue