diff --git a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala index 75bc85bb23..2ea046b856 100644 --- a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala +++ b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala @@ -235,7 +235,7 @@ object CamelMessage { */ case object Ack { /** Java API to get the Ack singleton */ - def ack = this + def getInstance = this } /** diff --git a/akka-camel/src/main/scala/akka/camel/Consumer.scala b/akka-camel/src/main/scala/akka/camel/Consumer.scala index 7c0119ef5e..1d21ffbec7 100644 --- a/akka-camel/src/main/scala/akka/camel/Consumer.scala +++ b/akka-camel/src/main/scala/akka/camel/Consumer.scala @@ -26,11 +26,6 @@ trait Consumer extends Actor with ConsumerConfig { camel.registerConsumer(endpointUri, this, activationTimeout) } -/** - * For internal use only. - */ -private[camel] object DefaultConsumerConfig extends ConsumerConfig - trait ConsumerConfig { /** diff --git a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala index e637914a2a..7ec5919dc9 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala @@ -17,7 +17,7 @@ import akka.util.duration._ import java.util.concurrent.{ TimeoutException, CountDownLatch } import akka.camel.internal.CamelExchangeAdapter import akka.util.{ NonFatal, Duration, Timeout } -import akka.camel.{ ActorNotRegisteredException, DefaultConsumerConfig, ConsumerConfig, Camel, Ack, FailureResult, CamelMessage } +import akka.camel.{ ActorNotRegisteredException, ConsumerConfig, Camel, Ack, FailureResult, CamelMessage } /** * For internal use only. @@ -171,17 +171,17 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex } private def forwardResponseTo(exchange: CamelExchangeAdapter): PartialFunction[Either[Throwable, Any], Unit] = { case Right(failure: FailureResult) ⇒ exchange.setFailure(failure) - case Right(msg) ⇒ exchange.setResponse(CamelMessage.canonicalize(msg)) - case Left(e: TimeoutException) ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) - case Left(throwable) ⇒ exchange.setFailure(FailureResult(throwable)) + case Right(msg) ⇒ exchange.setResponse(CamelMessage.canonicalize(msg)) + case Left(e: TimeoutException) ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) + case Left(throwable) ⇒ exchange.setFailure(FailureResult(throwable)) } private def forwardAckTo(exchange: CamelExchangeAdapter): PartialFunction[Either[Throwable, Any], Unit] = { - case Right(Ack) ⇒ { /* no response message to set */ } + case Right(Ack) ⇒ { /* no response message to set */ } case Right(failure: FailureResult) ⇒ exchange.setFailure(failure) - case Right(msg) ⇒ exchange.setFailure(FailureResult(new IllegalArgumentException("Expected Ack or Failure message, but got: [%s] from actor [%s]" format (msg, endpoint.path)))) - case Left(e: TimeoutException) ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get Ack or Failure response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) - case Left(throwable) ⇒ exchange.setFailure(FailureResult(throwable)) + case Right(msg) ⇒ exchange.setFailure(FailureResult(new IllegalArgumentException("Expected Ack or Failure message, but got: [%s] from actor [%s]" format (msg, endpoint.path)))) + case Left(e: TimeoutException) ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get Ack or Failure response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) + case Left(throwable) ⇒ exchange.setFailure(FailureResult(throwable)) } private def sendAsync(message: CamelMessage, onComplete: PartialFunction[Either[Throwable, Any], Unit]): Boolean = { @@ -229,20 +229,22 @@ private[camel] object DurationTypeConverter extends TypeConverter { * @param actorPath the String representation of the path to the actor */ private[camel] case class ActorEndpointPath private (actorPath: String) { + import ActorEndpointPath._ require(actorPath != null) require(actorPath.length() > 0) - def toCamelPath(config: ConsumerConfig = DefaultConsumerConfig): String = "actor://path:%s?%s" format (actorPath, config.toCamelParameters) + def toCamelPath(config: ConsumerConfig = consumerConfig): String = "actor://path:%s?%s" format (actorPath, config.toCamelParameters) def findActorIn(system: ActorSystem): Option[ActorRef] = { val ref = system.actorFor(actorPath) if (ref.isTerminated) None else Some(ref) } } - /** * For internal use only. Companion of `ActorEndpointPath` */ private[camel] object ActorEndpointPath { + private val consumerConfig = new ConsumerConfig {} + def apply(actorRef: ActorRef) = new ActorEndpointPath(actorRef.path.toString) /** diff --git a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumer.scala b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumer.scala index 9ed17340d1..56f11831d0 100644 --- a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumer.scala +++ b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumer.scala @@ -9,13 +9,10 @@ import akka.camel._ import org.apache.camel.{ ProducerTemplate, CamelContext } /** - * Java-friendly Consumer. - * - * @see UntypedConsumerActor - * - * @author Martin Krasser + * Subclass this abstract class to create an MDB-style untyped consumer actor. This + * class is meant to be used from Java. */ -trait UntypedConsumer extends Consumer { self: UntypedActor ⇒ +abstract class UntypedConsumerActor extends UntypedActor with Consumer { final def endpointUri = getEndpointUri /** @@ -23,13 +20,6 @@ trait UntypedConsumer extends Consumer { self: UntypedActor ⇒ */ def getEndpointUri(): String -} - -/** - * Subclass this abstract class to create an MDB-style untyped consumer actor. This - * class is meant to be used from Java. - */ -abstract class UntypedConsumerActor extends UntypedActor with UntypedConsumer { /** * Returns the [[org.apache.camel.CamelContext]] * @return the CamelContext diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala index 477e393026..cef098b8fe 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala @@ -10,18 +10,18 @@ import org.apache.camel.component.mock.MockEndpoint import akka.dispatch.Await import akka.camel.TestSupport.SharedCamelSystem import akka.actor.SupervisorStrategy.Stop -import org.scalatest.{ GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, WordSpec } +import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll, WordSpec } import akka.actor._ import akka.pattern._ import akka.util.duration._ import akka.util.Timeout -import akka.testkit.TestLatch import org.scalatest.matchers.MustMatchers +import akka.testkit.TestLatch /** * Tests the features of the Camel Producer. */ -class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAndAfterEach with SharedCamelSystem with GivenWhenThen with MustMatchers { +class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAndAfterEach with SharedCamelSystem with MustMatchers { import ProducerFeatureTest._ @@ -36,14 +36,10 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd override protected def afterEach { mockEndpoint.reset() } "A Producer on a sync Camel route" must { - "produce a message and receive normal response" in { - given("a registered two-way producer") val producer = system.actorOf(Props(new TestProducer("direct:producer-test-2", true))) - when("a test message is sent to the producer with ?") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) val future = producer.ask(message)(timeoutDuration) - then("a normal response must have been returned by the producer") val expected = CamelMessage("received TEST", Map(CamelMessage.MessageExchangeId -> "123")) Await.result(future, timeoutDuration) match { case result: CamelMessage ⇒ assert(result === expected) @@ -52,7 +48,6 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd } "produce a message and receive failure response" in { - given("a registered two-way producer") val latch = TestLatch() var deadActor: Option[ActorRef] = None val supervisor = system.actorOf(Props(new Actor { @@ -72,44 +67,31 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd } })) val producer = Await.result[ActorRef](supervisor.ask(Props(new TestProducer("direct:producer-test-2"))).mapTo[ActorRef], timeoutDuration) - - when("a test message causing an exception is sent to the producer with ?") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) val future = producer.ask(message)(timeoutDuration).failed + Await.ready(future, timeoutDuration).value match { case Some(Right(e: AkkaCamelException)) ⇒ - then("a failure response must have been returned by the producer") + // a failure response must have been returned by the producer e.getMessage must be("failure") e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) } - then("an AkkaCamelException must have been thrown, which can be used for supervision") - // check that the supervisor stopped the producer and received a Terminated Await.ready(latch, timeoutDuration) deadActor must be(Some(producer)) } "produce a message oneway" in { - given("a registered one-way producer") val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1", true) with Oneway)) - - when("a test message is sent to the producer with !") mockEndpoint.expectedBodiesReceived("TEST") producer ! CamelMessage("test", Map()) - - then("the test message must have been sent to mock:mock") mockEndpoint.assertIsSatisfied() } "produces message twoway without sender reference" in { - given("a registered two-way producer") val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1"))) - - when("a test message is sent to the producer with !") mockEndpoint.expectedBodiesReceived("test") producer ! CamelMessage("test", Map()) - - then("there must be only a warning that there's no sender reference") mockEndpoint.assertIsSatisfied() } } @@ -117,16 +99,13 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd "A Producer on an async Camel route" must { "produce message to direct:producer-test-3 and receive normal response" in { - given("a registered two-way producer") val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3"))) - - when("a test message is sent to the producer with ?") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) val future = producer.ask(message)(timeoutDuration) Await.result(future, timeoutDuration) match { case result: CamelMessage ⇒ - then("a normal response must have been returned by the producer") + // a normal response must have been returned by the producer val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123")) result must be(expected) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) @@ -134,16 +113,13 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd } "produce message to direct:producer-test-3 and receive failure response" in { - given("a registered two-way producer") val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3"))) - - when("a test message causing an exception is sent to the producer with ?") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeoutDuration).failed + Await.ready(future, timeoutDuration).value match { case Some(Right(e: AkkaCamelException)) ⇒ - then("a failure response must have been returned by the producer") + // a failure response must have been returned by the producer e.getMessage must be("failure") e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) @@ -151,17 +127,14 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd } "produce message, forward normal response of direct:producer-test-2 to a replying target actor and receive response" in { - given("a registered two-way producer configured with a forward target") val target = system.actorOf(Props[ReplyingForwardTarget]) val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target))) - - when("a test message is sent to the producer with ?") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) val future = producer.ask(message)(timeoutDuration) Await.result(future, timeoutDuration) match { case result: CamelMessage ⇒ - then("a normal response must have been returned by the forward target") + // a normal response must have been returned by the forward target val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result")) result must be(expected) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) @@ -169,16 +142,14 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd } "produce message, forward failure response of direct:producer-test-2 to a replying target actor and receive response" in { - given("a registered two-way producer configured with a forward target") val target = system.actorOf(Props[ReplyingForwardTarget]) val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target))) - - when("a test message causing an exception is sent to the producer with ?") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) + val future = producer.ask(message)(timeoutDuration).failed Await.ready(future, timeoutDuration).value match { case Some(Right(e: AkkaCamelException)) ⇒ - then("a failure response must have been returned by the forward target") + // a failure response must have been returned by the forward target e.getMessage must be("failure") e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) @@ -186,44 +157,28 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd } "produce message, forward normal response to a producing target actor and produce response to direct:forward-test-1" in { - given("a registered one-way producer configured with a forward target") val target = system.actorOf(Props[ProducingForwardTarget]) val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target))) - - when("a test message is sent to the producer with !") mockEndpoint.expectedBodiesReceived("received test") producer.tell(CamelMessage("test", Map()), producer) - - then("a normal response must have been produced by the forward target") mockEndpoint.assertIsSatisfied() } "produce message, forward failure response to a producing target actor and produce response to direct:forward-test-1" in { - given("a registered one-way producer configured with a forward target") - val target = system.actorOf(Props[ProducingForwardTarget]) val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target))) - - when("a test message causing an exception is sent to the producer with !") mockEndpoint.expectedMessageCount(1) mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure]) producer.tell(CamelMessage("fail", Map()), producer) - - then("a failure response must have been produced by the forward target") mockEndpoint.assertIsSatisfied() } "produce message, forward normal response from direct:producer-test-3 to a replying target actor and receive response" in { - given("a registered two-way producer configured with a forward target") val target = system.actorOf(Props[ReplyingForwardTarget]) val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target))) - - when("a test message is sent to the producer with ?") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) val future = producer.ask(message)(timeoutDuration) - - then("a normal response must have been returned by the forward target") Await.result(future, timeoutDuration) match { case message: CamelMessage ⇒ val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result")) @@ -233,16 +188,13 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd } "produce message, forward failure response from direct:producer-test-3 to a replying target actor and receive response" in { - given("a registered two-way producer configured with a forward target") val target = system.actorOf(Props[ReplyingForwardTarget]) val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target))) - when("a test message causing an exception is sent to the producer with ask") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) val future = producer.ask(message)(timeoutDuration).failed Await.ready(future, timeoutDuration).value match { case Some(Right(e: AkkaCamelException)) ⇒ - then("a failure response must have been returned by the forward target") e.getMessage must be("failure") e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) @@ -250,29 +202,19 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd } "produce message, forward normal response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in { - given("a registered one-way producer configured with a forward target") val target = system.actorOf(Props[ProducingForwardTarget]) val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target))) - - when("a test message is sent to the producer with !") mockEndpoint.expectedBodiesReceived("received test") producer.tell(CamelMessage("test", Map()), producer) - - then("a normal response must have been produced by the forward target") mockEndpoint.assertIsSatisfied() } "produce message, forward failure response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in { - given("a registered one-way producer configured with a forward target") val target = system.actorOf(Props[ProducingForwardTarget]) val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target))) - - when("a test message causing an exception is sent to the producer with !") mockEndpoint.expectedMessageCount(1) mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure]) producer.tell(CamelMessage("fail", Map()), producer) - - then("a failure response must have been produced by the forward target") mockEndpoint.assertIsSatisfied() } }