Merge pull request #448 from akka/ticket_1732
various comments processed on top of ticket 1732
This commit is contained in:
commit
ca1baebfbb
6 changed files with 21 additions and 92 deletions
|
|
@ -20,7 +20,7 @@ trait Activation {
|
|||
|
||||
def system: ActorSystem
|
||||
|
||||
private val activationTracker = system.actorOf(Props[ActivationTracker])
|
||||
private val activationTracker = system.actorOf(Props[ActivationTracker], "camelActivationTracker")
|
||||
|
||||
/**
|
||||
* Awaits for endpoint to be activated. It blocks until the endpoint is registered in camel context or timeout expires.
|
||||
|
|
|
|||
|
|
@ -235,7 +235,7 @@ object CamelMessage {
|
|||
*/
|
||||
case object Ack {
|
||||
/** Java API to get the Ack singleton */
|
||||
def ack = this
|
||||
def getInstance = this
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -26,11 +26,6 @@ trait Consumer extends Actor with ConsumerConfig {
|
|||
camel.registerConsumer(endpointUri, this, activationTimeout)
|
||||
}
|
||||
|
||||
/**
|
||||
* For internal use only.
|
||||
*/
|
||||
private[camel] object DefaultConsumerConfig extends ConsumerConfig
|
||||
|
||||
trait ConsumerConfig {
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ import akka.util.duration._
|
|||
import java.util.concurrent.{ TimeoutException, CountDownLatch }
|
||||
import akka.camel.internal.CamelExchangeAdapter
|
||||
import akka.util.{ NonFatal, Duration, Timeout }
|
||||
import akka.camel.{ ActorNotRegisteredException, DefaultConsumerConfig, ConsumerConfig, Camel, Ack, FailureResult, CamelMessage }
|
||||
import akka.camel.{ ActorNotRegisteredException, ConsumerConfig, Camel, Ack, FailureResult, CamelMessage }
|
||||
|
||||
/**
|
||||
* For internal use only.
|
||||
|
|
@ -229,20 +229,22 @@ private[camel] object DurationTypeConverter extends TypeConverter {
|
|||
* @param actorPath the String representation of the path to the actor
|
||||
*/
|
||||
private[camel] case class ActorEndpointPath private (actorPath: String) {
|
||||
import ActorEndpointPath._
|
||||
require(actorPath != null)
|
||||
require(actorPath.length() > 0)
|
||||
def toCamelPath(config: ConsumerConfig = DefaultConsumerConfig): String = "actor://path:%s?%s" format (actorPath, config.toCamelParameters)
|
||||
def toCamelPath(config: ConsumerConfig = consumerConfig): String = "actor://path:%s?%s" format (actorPath, config.toCamelParameters)
|
||||
|
||||
def findActorIn(system: ActorSystem): Option[ActorRef] = {
|
||||
val ref = system.actorFor(actorPath)
|
||||
if (ref.isTerminated) None else Some(ref)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* For internal use only. Companion of `ActorEndpointPath`
|
||||
*/
|
||||
private[camel] object ActorEndpointPath {
|
||||
private val consumerConfig = new ConsumerConfig {}
|
||||
|
||||
def apply(actorRef: ActorRef) = new ActorEndpointPath(actorRef.path.toString)
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -9,13 +9,10 @@ import akka.camel._
|
|||
import org.apache.camel.{ ProducerTemplate, CamelContext }
|
||||
|
||||
/**
|
||||
* Java-friendly Consumer.
|
||||
*
|
||||
* @see UntypedConsumerActor
|
||||
*
|
||||
* @author Martin Krasser
|
||||
* Subclass this abstract class to create an MDB-style untyped consumer actor. This
|
||||
* class is meant to be used from Java.
|
||||
*/
|
||||
trait UntypedConsumer extends Consumer { self: UntypedActor ⇒
|
||||
abstract class UntypedConsumerActor extends UntypedActor with Consumer {
|
||||
final def endpointUri = getEndpointUri
|
||||
|
||||
/**
|
||||
|
|
@ -23,13 +20,6 @@ trait UntypedConsumer extends Consumer { self: UntypedActor ⇒
|
|||
*/
|
||||
def getEndpointUri(): String
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclass this abstract class to create an MDB-style untyped consumer actor. This
|
||||
* class is meant to be used from Java.
|
||||
*/
|
||||
abstract class UntypedConsumerActor extends UntypedActor with UntypedConsumer {
|
||||
/**
|
||||
* Returns the [[org.apache.camel.CamelContext]]
|
||||
* @return the CamelContext
|
||||
|
|
|
|||
|
|
@ -10,18 +10,18 @@ import org.apache.camel.component.mock.MockEndpoint
|
|||
import akka.dispatch.Await
|
||||
import akka.camel.TestSupport.SharedCamelSystem
|
||||
import akka.actor.SupervisorStrategy.Stop
|
||||
import org.scalatest.{ GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, WordSpec }
|
||||
import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll, WordSpec }
|
||||
import akka.actor._
|
||||
import akka.pattern._
|
||||
import akka.util.duration._
|
||||
import akka.util.Timeout
|
||||
import akka.testkit.TestLatch
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import akka.testkit.TestLatch
|
||||
|
||||
/**
|
||||
* Tests the features of the Camel Producer.
|
||||
*/
|
||||
class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAndAfterEach with SharedCamelSystem with GivenWhenThen with MustMatchers {
|
||||
class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAndAfterEach with SharedCamelSystem with MustMatchers {
|
||||
|
||||
import ProducerFeatureTest._
|
||||
|
||||
|
|
@ -36,14 +36,10 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
|
|||
override protected def afterEach { mockEndpoint.reset() }
|
||||
|
||||
"A Producer on a sync Camel route" must {
|
||||
|
||||
"produce a message and receive normal response" in {
|
||||
given("a registered two-way producer")
|
||||
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-2", true)))
|
||||
when("a test message is sent to the producer with ?")
|
||||
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||
val future = producer.ask(message)(timeoutDuration)
|
||||
then("a normal response must have been returned by the producer")
|
||||
val expected = CamelMessage("received TEST", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||
Await.result(future, timeoutDuration) match {
|
||||
case result: CamelMessage ⇒ assert(result === expected)
|
||||
|
|
@ -52,7 +48,6 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
|
|||
}
|
||||
|
||||
"produce a message and receive failure response" in {
|
||||
given("a registered two-way producer")
|
||||
val latch = TestLatch()
|
||||
var deadActor: Option[ActorRef] = None
|
||||
val supervisor = system.actorOf(Props(new Actor {
|
||||
|
|
@ -72,44 +67,31 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
|
|||
}
|
||||
}))
|
||||
val producer = Await.result[ActorRef](supervisor.ask(Props(new TestProducer("direct:producer-test-2"))).mapTo[ActorRef], timeoutDuration)
|
||||
|
||||
when("a test message causing an exception is sent to the producer with ?")
|
||||
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||
val future = producer.ask(message)(timeoutDuration).failed
|
||||
|
||||
Await.ready(future, timeoutDuration).value match {
|
||||
case Some(Right(e: AkkaCamelException)) ⇒
|
||||
then("a failure response must have been returned by the producer")
|
||||
// a failure response must have been returned by the producer
|
||||
e.getMessage must be("failure")
|
||||
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123"))
|
||||
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
|
||||
}
|
||||
then("an AkkaCamelException must have been thrown, which can be used for supervision")
|
||||
// check that the supervisor stopped the producer and received a Terminated
|
||||
Await.ready(latch, timeoutDuration)
|
||||
deadActor must be(Some(producer))
|
||||
}
|
||||
|
||||
"produce a message oneway" in {
|
||||
given("a registered one-way producer")
|
||||
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1", true) with Oneway))
|
||||
|
||||
when("a test message is sent to the producer with !")
|
||||
mockEndpoint.expectedBodiesReceived("TEST")
|
||||
producer ! CamelMessage("test", Map())
|
||||
|
||||
then("the test message must have been sent to mock:mock")
|
||||
mockEndpoint.assertIsSatisfied()
|
||||
}
|
||||
|
||||
"produces message twoway without sender reference" in {
|
||||
given("a registered two-way producer")
|
||||
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1")))
|
||||
|
||||
when("a test message is sent to the producer with !")
|
||||
mockEndpoint.expectedBodiesReceived("test")
|
||||
producer ! CamelMessage("test", Map())
|
||||
|
||||
then("there must be only a warning that there's no sender reference")
|
||||
mockEndpoint.assertIsSatisfied()
|
||||
}
|
||||
}
|
||||
|
|
@ -117,16 +99,13 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
|
|||
"A Producer on an async Camel route" must {
|
||||
|
||||
"produce message to direct:producer-test-3 and receive normal response" in {
|
||||
given("a registered two-way producer")
|
||||
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")))
|
||||
|
||||
when("a test message is sent to the producer with ?")
|
||||
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||
val future = producer.ask(message)(timeoutDuration)
|
||||
|
||||
Await.result(future, timeoutDuration) match {
|
||||
case result: CamelMessage ⇒
|
||||
then("a normal response must have been returned by the producer")
|
||||
// a normal response must have been returned by the producer
|
||||
val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||
result must be(expected)
|
||||
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
|
||||
|
|
@ -134,16 +113,13 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
|
|||
}
|
||||
|
||||
"produce message to direct:producer-test-3 and receive failure response" in {
|
||||
given("a registered two-way producer")
|
||||
val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")))
|
||||
|
||||
when("a test message causing an exception is sent to the producer with ?")
|
||||
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||
|
||||
val future = producer.ask(message)(timeoutDuration).failed
|
||||
|
||||
Await.ready(future, timeoutDuration).value match {
|
||||
case Some(Right(e: AkkaCamelException)) ⇒
|
||||
then("a failure response must have been returned by the producer")
|
||||
// a failure response must have been returned by the producer
|
||||
e.getMessage must be("failure")
|
||||
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123"))
|
||||
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
|
||||
|
|
@ -151,17 +127,14 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
|
|||
}
|
||||
|
||||
"produce message, forward normal response of direct:producer-test-2 to a replying target actor and receive response" in {
|
||||
given("a registered two-way producer configured with a forward target")
|
||||
val target = system.actorOf(Props[ReplyingForwardTarget])
|
||||
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)))
|
||||
|
||||
when("a test message is sent to the producer with ?")
|
||||
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||
val future = producer.ask(message)(timeoutDuration)
|
||||
|
||||
Await.result(future, timeoutDuration) match {
|
||||
case result: CamelMessage ⇒
|
||||
then("a normal response must have been returned by the forward target")
|
||||
// a normal response must have been returned by the forward target
|
||||
val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result"))
|
||||
result must be(expected)
|
||||
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
|
||||
|
|
@ -169,16 +142,14 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
|
|||
}
|
||||
|
||||
"produce message, forward failure response of direct:producer-test-2 to a replying target actor and receive response" in {
|
||||
given("a registered two-way producer configured with a forward target")
|
||||
val target = system.actorOf(Props[ReplyingForwardTarget])
|
||||
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)))
|
||||
|
||||
when("a test message causing an exception is sent to the producer with ?")
|
||||
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||
|
||||
val future = producer.ask(message)(timeoutDuration).failed
|
||||
Await.ready(future, timeoutDuration).value match {
|
||||
case Some(Right(e: AkkaCamelException)) ⇒
|
||||
then("a failure response must have been returned by the forward target")
|
||||
// a failure response must have been returned by the forward target
|
||||
e.getMessage must be("failure")
|
||||
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
|
||||
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
|
||||
|
|
@ -186,44 +157,28 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
|
|||
}
|
||||
|
||||
"produce message, forward normal response to a producing target actor and produce response to direct:forward-test-1" in {
|
||||
given("a registered one-way producer configured with a forward target")
|
||||
val target = system.actorOf(Props[ProducingForwardTarget])
|
||||
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)))
|
||||
|
||||
when("a test message is sent to the producer with !")
|
||||
mockEndpoint.expectedBodiesReceived("received test")
|
||||
producer.tell(CamelMessage("test", Map()), producer)
|
||||
|
||||
then("a normal response must have been produced by the forward target")
|
||||
mockEndpoint.assertIsSatisfied()
|
||||
}
|
||||
|
||||
"produce message, forward failure response to a producing target actor and produce response to direct:forward-test-1" in {
|
||||
given("a registered one-way producer configured with a forward target")
|
||||
|
||||
val target = system.actorOf(Props[ProducingForwardTarget])
|
||||
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)))
|
||||
|
||||
when("a test message causing an exception is sent to the producer with !")
|
||||
mockEndpoint.expectedMessageCount(1)
|
||||
mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure])
|
||||
producer.tell(CamelMessage("fail", Map()), producer)
|
||||
|
||||
then("a failure response must have been produced by the forward target")
|
||||
mockEndpoint.assertIsSatisfied()
|
||||
}
|
||||
|
||||
"produce message, forward normal response from direct:producer-test-3 to a replying target actor and receive response" in {
|
||||
given("a registered two-way producer configured with a forward target")
|
||||
val target = system.actorOf(Props[ReplyingForwardTarget])
|
||||
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)))
|
||||
|
||||
when("a test message is sent to the producer with ?")
|
||||
val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||
|
||||
val future = producer.ask(message)(timeoutDuration)
|
||||
|
||||
then("a normal response must have been returned by the forward target")
|
||||
Await.result(future, timeoutDuration) match {
|
||||
case message: CamelMessage ⇒
|
||||
val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result"))
|
||||
|
|
@ -233,16 +188,13 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
|
|||
}
|
||||
|
||||
"produce message, forward failure response from direct:producer-test-3 to a replying target actor and receive response" in {
|
||||
given("a registered two-way producer configured with a forward target")
|
||||
val target = system.actorOf(Props[ReplyingForwardTarget])
|
||||
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)))
|
||||
|
||||
when("a test message causing an exception is sent to the producer with ask")
|
||||
val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||
val future = producer.ask(message)(timeoutDuration).failed
|
||||
Await.ready(future, timeoutDuration).value match {
|
||||
case Some(Right(e: AkkaCamelException)) ⇒
|
||||
then("a failure response must have been returned by the forward target")
|
||||
e.getMessage must be("failure")
|
||||
e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
|
||||
case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected)
|
||||
|
|
@ -250,29 +202,19 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd
|
|||
}
|
||||
|
||||
"produce message, forward normal response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in {
|
||||
given("a registered one-way producer configured with a forward target")
|
||||
val target = system.actorOf(Props[ProducingForwardTarget])
|
||||
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)))
|
||||
|
||||
when("a test message is sent to the producer with !")
|
||||
mockEndpoint.expectedBodiesReceived("received test")
|
||||
producer.tell(CamelMessage("test", Map()), producer)
|
||||
|
||||
then("a normal response must have been produced by the forward target")
|
||||
mockEndpoint.assertIsSatisfied()
|
||||
}
|
||||
|
||||
"produce message, forward failure response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in {
|
||||
given("a registered one-way producer configured with a forward target")
|
||||
val target = system.actorOf(Props[ProducingForwardTarget])
|
||||
val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)))
|
||||
|
||||
when("a test message causing an exception is sent to the producer with !")
|
||||
mockEndpoint.expectedMessageCount(1)
|
||||
mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure])
|
||||
producer.tell(CamelMessage("fail", Map()), producer)
|
||||
|
||||
then("a failure response must have been produced by the forward target")
|
||||
mockEndpoint.assertIsSatisfied()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue