pekko/akka-camel/src/main/scala/akka/camel/Activation.scala

88 lines
3.8 KiB
Scala
Raw Normal View History

/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.camel
import akka.camel.internal._
import akka.util.Timeout
2012-07-04 15:25:30 +02:00
import scala.concurrent.Future
import java.util.concurrent.TimeoutException
import akka.actor.{ ActorSystem, Props, ActorRef }
import akka.pattern._
import scala.concurrent.util.Duration
/**
* Activation trait that can be used to wait on activation or de-activation of Camel endpoints.
* The Camel endpoints are activated asynchronously. This trait can signal when an endpoint is activated or de-activated.
*/
trait Activation {
import scala.concurrent.Await
2012-05-23 15:17:49 +02:00
def system: ActorSystem //FIXME Why is this here, what's it needed for and who should use it?
2012-05-23 15:17:49 +02:00
private val activationTracker = system.actorOf(Props[ActivationTracker], "camelActivationTracker") //FIXME Why is this also top level?
/**
* Awaits for endpoint to be activated. It blocks until the endpoint is registered in camel context or timeout expires.
* @param endpoint the endpoint to wait for to be activated
* @param timeout the timeout for the wait
* @throws akka.camel.ActivationTimeoutException if endpoint is not activated within timeout.
2012-03-18 10:46:08 +01:00
* @return the activated ActorRef
*/
2012-05-23 15:17:49 +02:00
def awaitActivation(endpoint: ActorRef, timeout: Duration): ActorRef =
try Await.result(activationFutureFor(endpoint, timeout), timeout) catch {
case e: TimeoutException throw new ActivationTimeoutException(endpoint, timeout)
}
/**
* Awaits for endpoint to be de-activated. It is blocking until endpoint is unregistered in camel context or timeout expires.
* @param endpoint the endpoint to wait for to be de-activated
* @param timeout the timeout for the wait
* @throws akka.camel.DeActivationTimeoutException if endpoint is not de-activated within timeout.
*/
2012-05-23 15:17:49 +02:00
def awaitDeactivation(endpoint: ActorRef, timeout: Duration): Unit =
try Await.result(deactivationFutureFor(endpoint, timeout), timeout) catch {
case e: TimeoutException throw new DeActivationTimeoutException(endpoint, timeout)
}
/**
* Similar to `awaitActivation` but returns a future instead.
* @param endpoint the endpoint to be activated
* @param timeout the timeout for the Future
*/
2012-05-23 15:17:49 +02:00
def activationFutureFor(endpoint: ActorRef, timeout: Duration): Future[ActorRef] =
(activationTracker.ask(AwaitActivation(endpoint))(Timeout(timeout))).map[ActorRef] {
case EndpointActivated(_) endpoint
case EndpointFailedToActivate(_, cause) throw cause
}
/**
* Similar to awaitDeactivation but returns a future instead.
* @param endpoint the endpoint to be deactivated
* @param timeout the timeout of the Future
*/
2012-05-23 15:17:49 +02:00
def deactivationFutureFor(endpoint: ActorRef, timeout: Duration): Future[Unit] =
(activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[Unit] {
case EndpointDeActivated(_) ()
case EndpointFailedToDeActivate(_, cause) throw cause
}
}
/**
2012-03-18 10:46:08 +01:00
* An exception for when a timeout has occurred during deactivation of an endpoint.
* @param endpoint the endpoint that could not be de-activated in time
* @param timeout the timeout
*/
class DeActivationTimeoutException(endpoint: ActorRef, timeout: Duration) extends TimeoutException {
2012-05-23 15:17:49 +02:00
override def getMessage: String = "Timed out after %s, while waiting for de-activation of %s" format (timeout, endpoint.path)
}
/**
2012-03-18 10:46:08 +01:00
* An exception for when a timeout has occurred during the activation of an endpoint.
* @param endpoint the endpoint that could not be activated in time
* @param timeout the timeout
*/
class ActivationTimeoutException(endpoint: ActorRef, timeout: Duration) extends TimeoutException {
2012-05-23 15:17:49 +02:00
override def getMessage: String = "Timed out after %s, while waiting for activation of %s" format (timeout, endpoint.path)
}