=cam #3541 Log Camel endpoint activation/deactivation failures as error
This commit is contained in:
parent
fcacbbbddf
commit
a86bafddfc
2 changed files with 18 additions and 5 deletions
|
|
@ -13,6 +13,7 @@ import akka.actor.SupervisorStrategy.Resume
|
|||
import akka.camel.internal.CamelSupervisor._
|
||||
import akka.AkkaException
|
||||
import akka.camel.internal.ActivationProtocol._
|
||||
import akka.event.Logging
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -75,13 +76,13 @@ private[camel] object CamelSupervisor {
|
|||
* INTERNAL API
|
||||
* Thrown by registrars to indicate that the actor could not be de-activated.
|
||||
*/
|
||||
private[camel] class ActorDeActivationException(val actorRef: ActorRef, cause: Throwable) extends AkkaException("Actor [%s] failed to de-activate".format(actorRef), cause)
|
||||
private[camel] class ActorDeActivationException(val actorRef: ActorRef, cause: Throwable) extends AkkaException(s"$actorRef failed to de-activate", cause)
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* Thrown by the registrars to indicate that the actor could not be activated.
|
||||
*/
|
||||
private[camel] class ActorActivationException(val actorRef: ActorRef, cause: Throwable) extends AkkaException("Actor [%s] failed to activate".format(actorRef), cause)
|
||||
private[camel] class ActorActivationException(val actorRef: ActorRef, cause: Throwable) extends AkkaException(s"$actorRef failed to activate", cause)
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -95,7 +96,19 @@ private[camel] class Registry(activationTracker: ActorRef) extends Actor with Ca
|
|||
private var producers = Set[ActorRef]()
|
||||
private var consumers = Set[ActorRef]()
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy() {
|
||||
class RegistryLogStrategy()(_decider: SupervisorStrategy.Decider) extends OneForOneStrategy()(_decider) {
|
||||
override def logFailure(context: ActorContext, child: ActorRef, cause: Throwable,
|
||||
decision: SupervisorStrategy.Directive): Unit =
|
||||
cause match {
|
||||
case _: ActorActivationException | _: ActorDeActivationException ⇒
|
||||
try context.system.eventStream.publish {
|
||||
Logging.Error(cause.getCause, child.path.toString, getClass, cause.getMessage)
|
||||
} catch { case NonFatal(_) ⇒ }
|
||||
case _ ⇒ super.logFailure(context, child, cause, decision)
|
||||
}
|
||||
}
|
||||
|
||||
override val supervisorStrategy = new RegistryLogStrategy()({
|
||||
case e: ActorActivationException ⇒
|
||||
activationTracker ! EndpointFailedToActivate(e.actorRef, e.getCause)
|
||||
stop(e.actorRef)
|
||||
|
|
@ -106,7 +119,7 @@ private[camel] class Registry(activationTracker: ActorRef) extends Actor with Ca
|
|||
Resume
|
||||
case NonFatal(e) ⇒
|
||||
Resume
|
||||
}
|
||||
})
|
||||
|
||||
def receive = {
|
||||
case msg @ Register(consumer, _, Some(_)) ⇒
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC
|
|||
implicit def ec: ExecutionContext = system.dispatcher
|
||||
|
||||
"Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in {
|
||||
filterEvents(EventFilter.warning(pattern = "failed to activate.*", occurrences = 1)) {
|
||||
filterEvents(EventFilter[FailedToCreateRouteException](pattern = "failed to activate.*", occurrences = 1)) {
|
||||
val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri")), "invalidActor")
|
||||
intercept[FailedToCreateRouteException] {
|
||||
Await.result(camel.activationFutureFor(actorRef), defaultTimeoutDuration)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue