From a86bafddfc077aeb35de6678fa0973664eae1cf8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bjo=CC=88rn=20Antonsson?= Date: Fri, 23 Aug 2013 14:29:15 +0200 Subject: [PATCH] =cam #3541 Log Camel endpoint activation/deactivation failures as error --- .../akka/camel/internal/CamelSupervisor.scala | 21 +++++++++++++++---- .../akka/camel/ConsumerIntegrationTest.scala | 2 +- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala b/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala index d16b387f91..2ae44ff2b6 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala @@ -13,6 +13,7 @@ import akka.actor.SupervisorStrategy.Resume import akka.camel.internal.CamelSupervisor._ import akka.AkkaException import akka.camel.internal.ActivationProtocol._ +import akka.event.Logging /** * INTERNAL API @@ -75,13 +76,13 @@ private[camel] object CamelSupervisor { * INTERNAL API * Thrown by registrars to indicate that the actor could not be de-activated. */ -private[camel] class ActorDeActivationException(val actorRef: ActorRef, cause: Throwable) extends AkkaException("Actor [%s] failed to de-activate".format(actorRef), cause) +private[camel] class ActorDeActivationException(val actorRef: ActorRef, cause: Throwable) extends AkkaException(s"$actorRef failed to de-activate", cause) /** * INTERNAL API * Thrown by the registrars to indicate that the actor could not be activated. */ -private[camel] class ActorActivationException(val actorRef: ActorRef, cause: Throwable) extends AkkaException("Actor [%s] failed to activate".format(actorRef), cause) +private[camel] class ActorActivationException(val actorRef: ActorRef, cause: Throwable) extends AkkaException(s"$actorRef failed to activate", cause) /** * INTERNAL API @@ -95,7 +96,19 @@ private[camel] class Registry(activationTracker: ActorRef) extends Actor with Ca private var producers = Set[ActorRef]() private var consumers = Set[ActorRef]() - override val supervisorStrategy = OneForOneStrategy() { + class RegistryLogStrategy()(_decider: SupervisorStrategy.Decider) extends OneForOneStrategy()(_decider) { + override def logFailure(context: ActorContext, child: ActorRef, cause: Throwable, + decision: SupervisorStrategy.Directive): Unit = + cause match { + case _: ActorActivationException | _: ActorDeActivationException ⇒ + try context.system.eventStream.publish { + Logging.Error(cause.getCause, child.path.toString, getClass, cause.getMessage) + } catch { case NonFatal(_) ⇒ } + case _ ⇒ super.logFailure(context, child, cause, decision) + } + } + + override val supervisorStrategy = new RegistryLogStrategy()({ case e: ActorActivationException ⇒ activationTracker ! EndpointFailedToActivate(e.actorRef, e.getCause) stop(e.actorRef) @@ -106,7 +119,7 @@ private[camel] class Registry(activationTracker: ActorRef) extends Actor with Ca Resume case NonFatal(e) ⇒ Resume - } + }) def receive = { case msg @ Register(consumer, _, Some(_)) ⇒ diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala index 1c8881ef22..2900c9a342 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala @@ -29,7 +29,7 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC implicit def ec: ExecutionContext = system.dispatcher "Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in { - filterEvents(EventFilter.warning(pattern = "failed to activate.*", occurrences = 1)) { + filterEvents(EventFilter[FailedToCreateRouteException](pattern = "failed to activate.*", occurrences = 1)) { val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri")), "invalidActor") intercept[FailedToCreateRouteException] { Await.result(camel.activationFutureFor(actorRef), defaultTimeoutDuration)