diff --git a/akka-camel/src/main/scala/akka/camel/ActorNotRegisteredException.scala b/akka-camel/src/main/scala/akka/camel/ActorNotRegisteredException.scala index 7a303e47b3..29889d8bf6 100644 --- a/akka-camel/src/main/scala/akka/camel/ActorNotRegisteredException.scala +++ b/akka-camel/src/main/scala/akka/camel/ActorNotRegisteredException.scala @@ -3,7 +3,6 @@ package akka.camel * Thrown to indicate that the actor referenced by an endpoint URI cannot be * found in the actor system. * - * @author Martin Krasser */ class ActorNotRegisteredException(uri: String) extends RuntimeException { override def getMessage: String = "Actor [%s] doesn't exist" format uri diff --git a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala index 70fd61bd2a..c569ae0997 100644 --- a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala +++ b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala @@ -14,7 +14,6 @@ import akka.dispatch.Mapper /** * An immutable representation of a Camel message. - * @author Martin Krasser */ case class CamelMessage(body: Any, headers: Map[String, Any]) { def this(body: Any, headers: JMap[String, Any]) = this(body, headers.toMap) //for Java @@ -138,7 +137,6 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) { /** * Companion object of CamelMessage class. * - * @author Martin Krasser */ object CamelMessage { @@ -182,7 +180,7 @@ object CamelMessage { /** * Positive acknowledgement message (used for application-acknowledged message receipts). * When `autoAck` is set to false in the [[akka.camel.Consumer]], you can send an `Ack` to the sender of the CamelMessage. - * @author Martin Krasser + * */ case object Ack { /** Java API to get the Ack singleton */ diff --git a/akka-camel/src/main/scala/akka/camel/Consumer.scala b/akka-camel/src/main/scala/akka/camel/Consumer.scala index 2915235745..19ddc85b59 100644 --- a/akka-camel/src/main/scala/akka/camel/Consumer.scala +++ b/akka-camel/src/main/scala/akka/camel/Consumer.scala @@ -13,7 +13,7 @@ import akka.dispatch.Mapper /** * Mixed in by Actor implementations that consume message from Camel endpoints. * - * @author Martin Krasser + * */ trait Consumer extends Actor with CamelSupport { import Consumer._ diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala index 683ff4f20f..017304ea4d 100644 --- a/akka-camel/src/main/scala/akka/camel/Producer.scala +++ b/akka-camel/src/main/scala/akka/camel/Producer.scala @@ -13,8 +13,6 @@ import org.apache.camel.processor.SendProcessor /** * Support trait for producing messages to Camel endpoints. - * - * @author Martin Krasser */ trait ProducerSupport extends Actor with CamelSupport { private[this] var messages = Map[ActorRef, Any]() @@ -160,20 +158,20 @@ trait Producer extends ProducerSupport { this: Actor ⇒ /** * For internal use only. - * @author Martin Krasser + * */ private case class MessageResult(message: CamelMessage) extends NoSerializationVerificationNeeded /** * For internal use only. - * @author Martin Krasser + * */ private case class FailureResult(cause: Throwable, headers: Map[String, Any] = Map.empty) extends NoSerializationVerificationNeeded /** * A one-way producer. * - * @author Martin Krasser + * */ trait Oneway extends Producer { this: Actor ⇒ override def oneway: Boolean = true diff --git a/akka-camel/src/main/scala/akka/camel/internal/ActivationTracker.scala b/akka-camel/src/main/scala/akka/camel/internal/ActivationTracker.scala index 43ca2701c6..9beb6a8894 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/ActivationTracker.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/ActivationTracker.scala @@ -6,15 +6,14 @@ package akka.camel.internal import akka.actor._ import collection.mutable.WeakHashMap -import akka.camel._ -import internal.ActivationProtocol._ +import akka.camel.internal.ActivationProtocol._ /** * For internal use only. An actor that tracks activation and de-activation of endpoints. */ -private[akka] final class ActivationTracker extends Actor with ActorLogging { - val activations = new WeakHashMap[ActorRef, ActivationStateMachine] +private[camel] class ActivationTracker extends Actor with ActorLogging { + val activations = new WeakHashMap[ActorRef, ActivationStateMachine] /** * A state machine that keeps track of the endpoint activation status of an actor. */ @@ -22,7 +21,6 @@ private[akka] final class ActivationTracker extends Actor with ActorLogging { type State = PartialFunction[ActivationMessage, Unit] var receive: State = notActivated() - /** * Not activated state * @return a partial function that handles messages in the 'not activated' state @@ -68,8 +66,12 @@ private[akka] final class ActivationTracker extends Actor with ActorLogging { * @return a partial function that handles messages in the 'de-activated' state */ def deactivated: State = { + // deactivated means it was activated at some point, so tell sender it was activated case AwaitActivation(ref) ⇒ sender ! EndpointActivated(ref) case AwaitDeActivation(ref) ⇒ sender ! EndpointDeActivated(ref) + //resurrected at restart. + case msg @ EndpointActivated(ref) ⇒ + receive = activated(Nil) } /** @@ -80,6 +82,7 @@ private[akka] final class ActivationTracker extends Actor with ActorLogging { def failedToActivate(cause: Throwable): State = { case AwaitActivation(ref) ⇒ sender ! EndpointFailedToActivate(ref, cause) case AwaitDeActivation(ref) ⇒ sender ! EndpointFailedToActivate(ref, cause) + case EndpointDeActivated(_) ⇒ // the de-register at termination always sends a de-activated when the cleanup is done. ignoring. } /** @@ -90,6 +93,7 @@ private[akka] final class ActivationTracker extends Actor with ActorLogging { def failedToDeActivate(cause: Throwable): State = { case AwaitActivation(ref) ⇒ sender ! EndpointActivated(ref) case AwaitDeActivation(ref) ⇒ sender ! EndpointFailedToDeActivate(ref, cause) + case EndpointDeActivated(_) ⇒ // the de-register at termination always sends a de-activated when the cleanup is done. ignoring. } } @@ -114,4 +118,4 @@ private[camel] case class AwaitActivation(ref: ActorRef) extends ActivationMessa * For internal use only. * @param ref the actorRef */ -private[camel] case class AwaitDeActivation(ref: ActorRef) extends ActivationMessage(ref) \ No newline at end of file +private[camel] case class AwaitDeActivation(ref: ActorRef) extends ActivationMessage(ref) diff --git a/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala b/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala index 1d16c3003e..b6a991d4d5 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala @@ -11,7 +11,7 @@ import akka.camel.{ FailureResult, AkkaCamelException, CamelMessage } * This adapter is used to convert to immutable messages to be used with Actors, and convert the immutable messages back * to org.apache.camel.Message when using Camel. * - * @author Martin Krasser + * */ private[camel] class CamelExchangeAdapter(val exchange: Exchange) { /** diff --git a/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala b/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala index b19bdbc0a2..bbad41e02f 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala @@ -115,9 +115,9 @@ private[camel] class Registry(activationTracker: ActorRef) extends Actor with Ca case msg @ Register(producer, _, None) ⇒ if (!producers(producer)) { producers += producer - producerRegistrar forward msg parent ! AddWatch(producer) } + producerRegistrar forward msg case DeRegister(actorRef) ⇒ producers.find(_ == actorRef).foreach { p ⇒ deRegisterProducer(p) @@ -155,6 +155,8 @@ private[camel] class ProducerRegistrar(activationTracker: ActorRef) extends Acto } catch { case NonFatal(e) ⇒ throw new ActorActivationException(producer, e) } + } else { + camelObjects.get(producer).foreach { case (endpoint, processor) ⇒ producer ! CamelProducerObjects(endpoint, processor) } } case DeRegister(producer) ⇒ camelObjects.get(producer).foreach { diff --git a/akka-camel/src/main/scala/akka/camel/internal/ConsumerActorRouteBuilder.scala b/akka-camel/src/main/scala/akka/camel/internal/ConsumerActorRouteBuilder.scala index 2caf952c6a..a27c23ec2f 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/ConsumerActorRouteBuilder.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/ConsumerActorRouteBuilder.scala @@ -16,7 +16,7 @@ import org.apache.camel.model.RouteDefinition * * @param endpointUri endpoint URI of the consumer actor. * - * @author Martin Krasser + * */ private[camel] class ConsumerActorRouteBuilder(endpointUri: String, consumer: ActorRef, config: ConsumerConfig, settings: CamelSettings) extends RouteBuilder { diff --git a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala index 7400de9810..2585b970c9 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala @@ -31,7 +31,7 @@ import scala.util.{ Failure, Success, Try } * Messages are sent to [[akka.camel.Consumer]] actors through a [[akka.camel.internal.component.ActorEndpoint]] that * this component provides. * - * @author Martin Krasser + * */ private[camel] class ActorComponent(camel: Camel, system: ActorSystem) extends DefaultComponent { /** @@ -52,7 +52,7 @@ private[camel] class ActorComponent(camel: Camel, system: ActorSystem) extends D * [actorPath]?[options]%s, * where [actorPath] refers to the actor path to the actor. * - * @author Martin Krasser + * */ private[camel] class ActorEndpoint(uri: String, comp: ActorComponent, @@ -104,7 +104,7 @@ private[camel] trait ActorEndpointConfig { * @see akka.camel.component.ActorComponent * @see akka.camel.component.ActorEndpoint * - * @author Martin Krasser + * */ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) extends DefaultProducer(endpoint) with AsyncProcessor { /** diff --git a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala index cd353e04a0..7688df5130 100644 --- a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala +++ b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala @@ -12,7 +12,7 @@ import org.apache.camel.impl.DefaultCamelContext /** * Subclass this abstract class to create an untyped producer actor. This class is meant to be used from Java. * - * @author Martin Krasser + * */ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport { /** diff --git a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java index e8b178a463..d8aec8a761 100644 --- a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java +++ b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java @@ -19,7 +19,7 @@ import java.util.concurrent.TimeUnit; import akka.testkit.AkkaSpec; import static org.junit.Assert.*; /** - * @author Martin Krasser + * */ public class ConsumerJavaTestBase { diff --git a/akka-camel/src/test/java/akka/camel/MessageJavaTestBase.java b/akka-camel/src/test/java/akka/camel/MessageJavaTestBase.java index 95cdc5007b..6b366b0a10 100644 --- a/akka-camel/src/test/java/akka/camel/MessageJavaTestBase.java +++ b/akka-camel/src/test/java/akka/camel/MessageJavaTestBase.java @@ -18,7 +18,7 @@ import java.util.*; import static org.junit.Assert.assertEquals; /** - * @author Martin Krasser + * */ public class MessageJavaTestBase { static Camel camel; diff --git a/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java b/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java index e8a057e1ac..92fb124a11 100644 --- a/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java +++ b/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java @@ -15,7 +15,7 @@ import scala.Option; import scala.concurrent.duration.FiniteDuration; /** - * @author Martin Krasser + * */ public class SampleErrorHandlingConsumer extends UntypedConsumerActor { private static Mapper> mapper = new Mapper>() { diff --git a/akka-camel/src/test/java/akka/camel/SampleUntypedConsumer.java b/akka-camel/src/test/java/akka/camel/SampleUntypedConsumer.java index be293c21b9..030c951cc9 100644 --- a/akka-camel/src/test/java/akka/camel/SampleUntypedConsumer.java +++ b/akka-camel/src/test/java/akka/camel/SampleUntypedConsumer.java @@ -7,7 +7,7 @@ package akka.camel; import akka.camel.javaapi.UntypedConsumerActor; /** - * @author Martin Krasser + * */ public class SampleUntypedConsumer extends UntypedConsumerActor { diff --git a/akka-camel/src/test/java/akka/camel/SampleUntypedForwardingProducer.java b/akka-camel/src/test/java/akka/camel/SampleUntypedForwardingProducer.java index 375ef36835..b99a7ecc31 100644 --- a/akka-camel/src/test/java/akka/camel/SampleUntypedForwardingProducer.java +++ b/akka-camel/src/test/java/akka/camel/SampleUntypedForwardingProducer.java @@ -6,7 +6,7 @@ package akka.camel; import akka.camel.javaapi.UntypedProducerActor; /** - * @author Martin Krasser + * */ public class SampleUntypedForwardingProducer extends UntypedProducerActor { diff --git a/akka-camel/src/test/java/akka/camel/SampleUntypedReplyingProducer.java b/akka-camel/src/test/java/akka/camel/SampleUntypedReplyingProducer.java index 039494fd00..c47187d1da 100644 --- a/akka-camel/src/test/java/akka/camel/SampleUntypedReplyingProducer.java +++ b/akka-camel/src/test/java/akka/camel/SampleUntypedReplyingProducer.java @@ -7,7 +7,7 @@ package akka.camel; import akka.camel.javaapi.UntypedProducerActor; /** - * @author Martin Krasser + * */ public class SampleUntypedReplyingProducer extends UntypedProducerActor { diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala index acc72ff9b1..6462e0b191 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala @@ -30,7 +30,7 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC "Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in { filterEvents(EventFilter[ActorActivationException](occurrences = 1)) { - val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri"))) + val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri")), "invalidActor") intercept[FailedToCreateRouteException] { Await.result(camel.activationFutureFor(actorRef), defaultTimeoutDuration) } diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala index 7cf9e92464..07a46781c8 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala @@ -19,13 +19,20 @@ import scala.concurrent.duration._ import akka.util.Timeout import org.scalatest.matchers.MustMatchers import akka.testkit._ +import akka.actor.Status.Failure /** * Tests the features of the Camel Producer. */ -class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAndAfterEach with SharedCamelSystem with MustMatchers { +class ProducerFeatureTest extends TestKit(ActorSystem("test", AkkaSpec.testConf)) with WordSpec with BeforeAndAfterAll with BeforeAndAfterEach with MustMatchers { import ProducerFeatureTest._ + implicit def camel = CamelExtension(system) + + override protected def afterAll() { + super.afterAll() + system.shutdown() + } val camelContext = camel.context // to make testing equality of messages easier, otherwise the breadcrumb shows up in the result. @@ -40,9 +47,8 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd "produce a message and receive normal response" in { val producer = system.actorOf(Props(new TestProducer("direct:producer-test-2", true)), name = "direct-producer-2") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeoutDuration) - val expected = CamelMessage("received TEST", Map(CamelMessage.MessageExchangeId -> "123")) - Await.result(future, timeoutDuration) must be === expected + producer.tell(message, testActor) + expectMsg(CamelMessage("received TEST", Map(CamelMessage.MessageExchangeId -> "123"))) stopGracefully(producer) } @@ -65,12 +71,17 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd case _: AkkaCamelException ⇒ Stop } }), name = "prod-anonymous-supervisor") - val producer = Await.result[ActorRef](supervisor.ask(Props(new TestProducer("direct:producer-test-2"))).mapTo[ActorRef], timeoutDuration) + + supervisor.tell(Props(new TestProducer("direct:producer-test-2")), testActor) + val producer = receiveOne(timeoutDuration).asInstanceOf[ActorRef] val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { - val e = intercept[AkkaCamelException] { Await.result(producer.ask(message)(timeoutDuration), timeoutDuration) } - e.getMessage must be("failure") - e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) + producer.tell(message, testActor) + expectMsgPF(timeoutDuration) { + case Failure(e: AkkaCamelException) ⇒ + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) + } } Await.ready(latch, timeoutDuration) deadActor must be(Some(producer)) @@ -101,15 +112,8 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd "produce message to direct:producer-test-3 and receive normal response" in { val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")), name = "direct-producer-test-3") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeoutDuration) - - Await.result(future, timeoutDuration) match { - case result: CamelMessage ⇒ - // a normal response must have been returned by the producer - val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123")) - result must be(expected) - case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) - } + producer.tell(message, testActor) + expectMsg(CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123"))) stopGracefully(producer) } @@ -118,9 +122,12 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { - val e = intercept[AkkaCamelException] { Await.result(producer.ask(message)(timeoutDuration), timeoutDuration) } - e.getMessage must be("failure") - e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) + producer.tell(message, testActor) + expectMsgPF(timeoutDuration) { + case Failure(e: AkkaCamelException) ⇒ + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) + } } stopGracefully(producer) } @@ -129,15 +136,8 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd val target = system.actorOf(Props[ReplyingForwardTarget], name = "reply-forwarding-target") val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "direct-producer-test-2-forwarder") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeoutDuration) - - Await.result(future, timeoutDuration) match { - case result: CamelMessage ⇒ - // a normal response must have been returned by the forward target - val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result")) - result must be(expected) - case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) - } + producer.tell(message, testActor) + expectMsg(CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result"))) stopGracefully(target, producer) } @@ -147,9 +147,12 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { - val e = intercept[AkkaCamelException] { Await.result(producer.ask(message)(timeoutDuration), timeoutDuration) } - e.getMessage must be("failure") - e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) + producer.tell(message, testActor) + expectMsgPF(timeoutDuration) { + case Failure(e: AkkaCamelException) ⇒ + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) + } } stopGracefully(target, producer) } @@ -180,13 +183,8 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "direct-producer-test-3-to-replying-actor") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeoutDuration) - Await.result(future, timeoutDuration) match { - case message: CamelMessage ⇒ - val expected = CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result")) - message must be(expected) - case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) - } + producer.tell(message, testActor) + expectMsg(CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result"))) stopGracefully(target, producer) } @@ -196,9 +194,12 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { - val e = intercept[AkkaCamelException] { Await.result(producer.ask(message)(timeoutDuration), timeoutDuration) } - e.getMessage must be("failure") - e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) + producer.tell(message, testActor) + expectMsgPF(timeoutDuration) { + case Failure(e: AkkaCamelException) ⇒ + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) + } } stopGracefully(target, producer) } @@ -224,6 +225,23 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd } stopGracefully(target, producer) } + + "keep producing messages after error" in { + import TestSupport._ + val consumer = start(new IntermittentErrorConsumer("direct:intermittentTest-1"), "intermittentTest-error-consumer") + val producer = start(new SimpleProducer("direct:intermittentTest-1"), "intermittentTest-producer") + filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { + val futureFailed = producer.tell("fail", testActor) + expectMsgPF(timeoutDuration) { + case Failure(e) ⇒ + e.getMessage must be("fail") + } + producer.tell("OK", testActor) + expectMsg("OK") + } + stop(consumer) + stop(producer) + } } private def mockEndpoint = camel.context.getEndpoint("mock:mock", classOf[MockEndpoint]) @@ -239,6 +257,11 @@ object ProducerFeatureTest { class TestProducer(uri: String, upper: Boolean = false) extends Actor with Producer { def endpointUri = uri + override def preRestart(reason: Throwable, message: Option[Any]) { + //overriding on purpose so it doesn't try to deRegister and reRegister at restart, + // which would cause a deadletter message in the test output. + } + override protected def transformOutgoingMessage(msg: Any) = msg match { case msg: CamelMessage ⇒ if (upper) msg.mapBody { body: String ⇒ body.toUpperCase @@ -303,4 +326,18 @@ object ProducerFeatureTest { } } + class SimpleProducer(override val endpointUri: String) extends Producer { + override protected def transformResponse(msg: Any) = msg match { + case m: CamelMessage ⇒ m.bodyAs[String] + case m: Any ⇒ m + } + } + + class IntermittentErrorConsumer(override val endpointUri: String) extends Consumer { + def receive = { + case msg: CamelMessage if msg.bodyAs[String] == "fail" ⇒ sender ! Failure(new Exception("fail")) + case msg: CamelMessage ⇒ sender ! msg + } + } + } diff --git a/akka-camel/src/test/scala/akka/camel/internal/ActivationTrackerTest.scala b/akka-camel/src/test/scala/akka/camel/internal/ActivationTrackerTest.scala index 99ebeafed7..783e7ab9a5 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/ActivationTrackerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/ActivationTrackerTest.scala @@ -109,6 +109,14 @@ class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec w awaiting.verifyActivated() } + + "send activation message when an actor is activated, deactivated and activated again" taggedAs TimingTest in { + publish(EndpointActivated(actor.ref)) + publish(EndpointDeActivated(actor.ref)) + publish(EndpointActivated(actor.ref)) + awaiting.awaitActivation() + awaiting.verifyActivated() + } } class Awaiting(actor: TestProbe) {