diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala
index 9aec23b4c6..682e6ba4bf 100644
--- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala
@@ -214,7 +214,7 @@ private[akka] object MessageDispatcher {
// dispatcher debugging helper using println (see below)
// since this is a compile-time constant, scalac will elide code behind if (MessageDispatcher.debug) (RK checked with 2.9.1)
- final val debug = false
+ final val debug = false // Deliberately without type ascription to make it a compile-time constant
lazy val actors = new Index[MessageDispatcher, ActorRef](16, _ compareTo _)
def printActors: Unit = if (debug) {
for {
diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala
index 0f0bbad1ee..35b1e35012 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala
@@ -31,15 +31,15 @@ private[akka] object Mailbox {
*/
// primary status: only first three
- final val Open = 0 // _status is not initialized in AbstractMailbox, so default must be zero!
- final val Suspended = 1
- final val Closed = 2
+ final val Open = 0 // _status is not initialized in AbstractMailbox, so default must be zero! Deliberately without type ascription to make it a compile-time constant
+ final val Suspended = 1 // Deliberately without type ascription to make it a compile-time constant
+ final val Closed = 2 // Deliberately without type ascription to make it a compile-time constant
// secondary status: Scheduled bit may be added to Open/Suspended
- final val Scheduled = 4
+ final val Scheduled = 4 // Deliberately without type ascription to make it a compile-time constant
// mailbox debugging helper using println (see below)
// since this is a compile-time constant, scalac will elide code behind if (Mailbox.debug) (RK checked with 2.9.1)
- final val debug = false
+ final val debug = false // Deliberately without type ascription to make it a compile-time constant
}
/**
diff --git a/akka-camel/src/main/scala/akka/camel/Activation.scala b/akka-camel/src/main/scala/akka/camel/Activation.scala
index 56d116dca8..d01c990136 100644
--- a/akka-camel/src/main/scala/akka/camel/Activation.scala
+++ b/akka-camel/src/main/scala/akka/camel/Activation.scala
@@ -18,9 +18,9 @@ import akka.pattern._
trait Activation {
import akka.dispatch.Await
- def system: ActorSystem
+ def system: ActorSystem //FIXME Why is this here, what's it needed for and who should use it?
- private val activationTracker = system.actorOf(Props[ActivationTracker], "camelActivationTracker")
+ private val activationTracker = system.actorOf(Props[ActivationTracker], "camelActivationTracker") //FIXME Why is this also top level?
/**
* Awaits for endpoint to be activated. It blocks until the endpoint is registered in camel context or timeout expires.
@@ -29,13 +29,10 @@ trait Activation {
* @throws akka.camel.ActivationTimeoutException if endpoint is not activated within timeout.
* @return the activated ActorRef
*/
- def awaitActivation(endpoint: ActorRef, timeout: Duration): ActorRef = {
- try {
- Await.result(activationFutureFor(endpoint, timeout), timeout)
- } catch {
+ def awaitActivation(endpoint: ActorRef, timeout: Duration): ActorRef =
+ try Await.result(activationFutureFor(endpoint, timeout), timeout) catch {
case e: TimeoutException ⇒ throw new ActivationTimeoutException(endpoint, timeout)
}
- }
/**
* Awaits for endpoint to be de-activated. It is blocking until endpoint is unregistered in camel context or timeout expires.
@@ -43,37 +40,32 @@ trait Activation {
* @param timeout the timeout for the wait
* @throws akka.camel.DeActivationTimeoutException if endpoint is not de-activated within timeout.
*/
- def awaitDeactivation(endpoint: ActorRef, timeout: Duration) {
- try {
- Await.result(deactivationFutureFor(endpoint, timeout), timeout)
- } catch {
+ def awaitDeactivation(endpoint: ActorRef, timeout: Duration): Unit =
+ try Await.result(deactivationFutureFor(endpoint, timeout), timeout) catch {
case e: TimeoutException ⇒ throw new DeActivationTimeoutException(endpoint, timeout)
}
- }
/**
* Similar to `awaitActivation` but returns a future instead.
* @param endpoint the endpoint to be activated
* @param timeout the timeout for the Future
*/
- def activationFutureFor(endpoint: ActorRef, timeout: Duration): Future[ActorRef] = {
+ def activationFutureFor(endpoint: ActorRef, timeout: Duration): Future[ActorRef] =
(activationTracker.ask(AwaitActivation(endpoint))(Timeout(timeout))).map[ActorRef] {
case EndpointActivated(_) ⇒ endpoint
case EndpointFailedToActivate(_, cause) ⇒ throw cause
}
- }
/**
* Similar to awaitDeactivation but returns a future instead.
* @param endpoint the endpoint to be deactivated
* @param timeout the timeout of the Future
*/
- def deactivationFutureFor(endpoint: ActorRef, timeout: Duration): Future[Unit] = {
+ def deactivationFutureFor(endpoint: ActorRef, timeout: Duration): Future[Unit] =
(activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[Unit] {
case EndpointDeActivated(_) ⇒ ()
case EndpointFailedToDeActivate(_, cause) ⇒ throw cause
}
- }
}
/**
@@ -82,7 +74,7 @@ trait Activation {
* @param timeout the timeout
*/
class DeActivationTimeoutException(endpoint: ActorRef, timeout: Duration) extends TimeoutException {
- override def getMessage = "Timed out after %s, while waiting for de-activation of %s" format (timeout, endpoint.path)
+ override def getMessage: String = "Timed out after %s, while waiting for de-activation of %s" format (timeout, endpoint.path)
}
/**
@@ -91,5 +83,5 @@ class DeActivationTimeoutException(endpoint: ActorRef, timeout: Duration) extend
* @param timeout the timeout
*/
class ActivationTimeoutException(endpoint: ActorRef, timeout: Duration) extends TimeoutException {
- override def getMessage = "Timed out after %s, while waiting for activation of %s" format (timeout, endpoint.path)
+ override def getMessage: String = "Timed out after %s, while waiting for activation of %s" format (timeout, endpoint.path)
}
\ No newline at end of file
diff --git a/akka-camel/src/main/scala/akka/camel/ActorNotRegisteredException.scala b/akka-camel/src/main/scala/akka/camel/ActorNotRegisteredException.scala
index a468eeace5..7a303e47b3 100644
--- a/akka-camel/src/main/scala/akka/camel/ActorNotRegisteredException.scala
+++ b/akka-camel/src/main/scala/akka/camel/ActorNotRegisteredException.scala
@@ -6,5 +6,5 @@ package akka.camel
* @author Martin Krasser
*/
class ActorNotRegisteredException(uri: String) extends RuntimeException {
- override def getMessage = "Actor [%s] doesn't exist" format uri
+ override def getMessage: String = "Actor [%s] doesn't exist" format uri
}
diff --git a/akka-camel/src/main/scala/akka/camel/ActorRouteDefinition.scala b/akka-camel/src/main/scala/akka/camel/ActorRouteDefinition.scala
index f5175b90eb..6286edad87 100644
--- a/akka-camel/src/main/scala/akka/camel/ActorRouteDefinition.scala
+++ b/akka-camel/src/main/scala/akka/camel/ActorRouteDefinition.scala
@@ -29,7 +29,8 @@ class ActorRouteDefinition(definition: ProcessorDefinition[_]) {
* @param actorRef the consumer with a default configuration.
* @return the path to the actor, as a camel uri String
*/
- def to(actorRef: ActorRef) = definition.to(ActorEndpointPath(actorRef).toCamelPath())
+ def to(actorRef: ActorRef) = //FIXME What is the return type of this?
+ definition.to(ActorEndpointPath(actorRef).toCamelPath())
/**
* Sends the message to an ActorRef endpoint
@@ -37,6 +38,7 @@ class ActorRouteDefinition(definition: ProcessorDefinition[_]) {
* @param consumerConfig the configuration for the consumer
* @return the path to the actor, as a camel uri String
*/
- def to(actorRef: ActorRef, consumerConfig: ConsumerConfig) = definition.to(ActorEndpointPath(actorRef).toCamelPath(consumerConfig))
+ def to(actorRef: ActorRef, consumerConfig: ConsumerConfig) = //FIXME What is the return type of this?
+ definition.to(ActorEndpointPath(actorRef).toCamelPath(consumerConfig))
}
diff --git a/akka-camel/src/main/scala/akka/camel/Camel.scala b/akka-camel/src/main/scala/akka/camel/Camel.scala
index 4e96f038e5..72252212cf 100644
--- a/akka-camel/src/main/scala/akka/camel/Camel.scala
+++ b/akka-camel/src/main/scala/akka/camel/Camel.scala
@@ -50,13 +50,13 @@ object CamelExtension extends ExtensionId[Camel] with ExtensionIdProvider {
/**
* Creates a new instance of Camel and makes sure it gets stopped when the actor system is shutdown.
*/
- def createExtension(system: ExtendedActorSystem) = {
+ override def createExtension(system: ExtendedActorSystem): Camel = {
val camel = new DefaultCamel(system).start
system.registerOnTermination(camel.shutdown())
camel
}
- def lookup(): ExtensionId[Camel] = CamelExtension
+ override def lookup(): ExtensionId[Camel] = CamelExtension
override def get(system: ActorSystem): Camel = super.get(system)
}
\ No newline at end of file
diff --git a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala
index 2ea046b856..4f617c83a4 100644
--- a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala
+++ b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala
@@ -21,12 +21,12 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) {
def this(body: Any, headers: JMap[String, Any]) = this(body, headers.toMap) //for Java
- override def toString = "CamelMessage(%s, %s)" format (body, headers)
+ override def toString: String = "CamelMessage(%s, %s)" format (body, headers)
/**
* Returns those headers from this message whose name is contained in names.
*/
- def headers(names: Set[String]): Map[String, Any] = headers.filterKeys(names contains _)
+ def headers(names: Set[String]): Map[String, Any] = headers filterKeys names
/**
* Returns those headers from this message whose name is contained in names.
@@ -75,7 +75,7 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) {
/**
* Creates a CamelMessage with a given body.
*/
- def withBody(body: Any) = CamelMessage(body, this.headers)
+ def withBody(body: Any): CamelMessage = CamelMessage(body, this.headers)
/**
* Creates a new CamelMessage with given headers.
@@ -119,9 +119,9 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) {
* Creates a new CamelMessage where the header with given headerName is removed from
* the existing headers.
*/
- def withoutHeader(headerName: String) = copy(this.body, this.headers - headerName)
+ def withoutHeader(headerName: String): CamelMessage = copy(this.body, this.headers - headerName)
- def copyContentTo(to: JCamelMessage) = {
+ def copyContentTo(to: JCamelMessage): Unit = {
to.setBody(this.body)
for ((name, value) ← this.headers) to.getHeaders.put(name, value.asInstanceOf[AnyRef])
}
@@ -145,8 +145,7 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) {
* Java API
*
*/
- def getBodyAs[T](clazz: Class[T], camelContext: CamelContext): T =
- camelContext.getTypeConverter.mandatoryConvertTo[T](clazz, body)
+ def getBodyAs[T](clazz: Class[T], camelContext: CamelContext): T = camelContext.getTypeConverter.mandatoryConvertTo[T](clazz, body)
/**
* Creates a CamelMessage with current body converted to type T.
@@ -184,7 +183,7 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) {
*
* Java API
*/
- def getHeaderAs[T](name: String, clazz: Class[T], camelContext: CamelContext) = headerAs[T](name)(Manifest.classType(clazz), camelContext).get
+ def getHeaderAs[T](name: String, clazz: Class[T], camelContext: CamelContext): T = headerAs[T](name)(Manifest.classType(clazz), camelContext).get
}
@@ -201,7 +200,7 @@ object CamelMessage {
* so that it can be correlated with an asynchronous response. Messages send to Consumer
* actors have this header already set.
*/
- val MessageExchangeId = "MessageExchangeId".intern
+ val MessageExchangeId = "MessageExchangeId".intern //Deliberately without type ascription to make it a constant
/**
* Creates a canonical form of the given message msg. If msg of type
@@ -244,5 +243,7 @@ case object Ack {
* message or Exchange.getOut message, depending on the exchange pattern.
*
*/
-class AkkaCamelException private[akka] (cause: Throwable, val headers: Map[String, Any] = Map.empty)
- extends AkkaException(cause.getMessage, cause)
+class AkkaCamelException private[akka] (cause: Throwable, val headers: Map[String, Any])
+ extends AkkaException(cause.getMessage, cause) {
+ def this(cause: Throwable) = this(cause, Map.empty)
+}
diff --git a/akka-camel/src/main/scala/akka/camel/Consumer.scala b/akka-camel/src/main/scala/akka/camel/Consumer.scala
index 1d21ffbec7..0351ce39cb 100644
--- a/akka-camel/src/main/scala/akka/camel/Consumer.scala
+++ b/akka-camel/src/main/scala/akka/camel/Consumer.scala
@@ -31,7 +31,7 @@ trait ConsumerConfig {
/**
* How long the actor should wait for activation before it fails.
*/
- def activationTimeout: Duration = 10 seconds
+ def activationTimeout: Duration = 10 seconds // FIXME Should be configured in reference.conf
/**
* When endpoint is out-capable (can produce responses) replyTimeout is the maximum time
@@ -39,14 +39,14 @@ trait ConsumerConfig {
* This setting is used for out-capable, in-only, manually acknowledged communication.
* When the blocking is set to Blocking replyTimeout is ignored.
*/
- def replyTimeout: Duration = 1 minute
+ def replyTimeout: Duration = 1 minute // FIXME Should be configured in reference.conf
/**
* Determines whether one-way communications between an endpoint and this consumer actor
* should be auto-acknowledged or application-acknowledged.
* This flag has only effect when exchange is in-only.
*/
- def autoack: Boolean = true
+ def autoack: Boolean = true // FIXME Should be configured in reference.conf
/**
* The route definition handler for creating a custom route to this consumer instance.
diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala
index 33541d4611..5a7262a133 100644
--- a/akka-camel/src/main/scala/akka/camel/Producer.scala
+++ b/akka-camel/src/main/scala/akka/camel/Producer.scala
@@ -6,8 +6,9 @@ package akka.camel
import akka.actor.Actor
import internal.CamelExchangeAdapter
-import org.apache.camel.{ Exchange, ExchangePattern, AsyncCallback }
import akka.actor.Status.Failure
+import org.apache.camel.{ Endpoint, Exchange, ExchangePattern, AsyncCallback }
+import org.apache.camel.processor.SendProcessor
/**
* Support trait for producing messages to Camel endpoints.
@@ -15,19 +16,19 @@ import akka.actor.Status.Failure
* @author Martin Krasser
*/
trait ProducerSupport { this: Actor ⇒
- protected[this] implicit def camel = CamelExtension(context.system)
+ protected[this] implicit def camel = CamelExtension(context.system) // FIXME This is duplicated from Consumer, create a common base-trait?
/**
* camelContext implicit is useful when using advanced methods of CamelMessage.
*/
- protected[this] implicit def camelContext = camel.context
+ protected[this] implicit def camelContext = camel.context // FIXME This is duplicated from Consumer, create a common base-trait?
- protected[this] lazy val (endpoint, processor) = camel.registerProducer(self, endpointUri)
+ protected[this] lazy val (endpoint: Endpoint, processor: SendProcessor) = camel.registerProducer(self, endpointUri)
/**
* CamelMessage headers to copy by default from request message to response-message.
*/
- private val headersToCopyDefault = Set(CamelMessage.MessageExchangeId)
+ private val headersToCopyDefault: Set[String] = Set(CamelMessage.MessageExchangeId)
/**
* If set to false (default), this producer expects a response message from the Camel endpoint.
@@ -64,20 +65,21 @@ trait ProducerSupport { this: Actor ⇒
* @param pattern exchange pattern
*/
protected def produce(msg: Any, pattern: ExchangePattern): Unit = {
- implicit def toExchangeAdapter(exchange: Exchange): CamelExchangeAdapter = new CamelExchangeAdapter(exchange)
+ // Need copies of sender reference here since the callback could be done
+ // later by another thread.
+ val producer = self
+ val originalSender = sender
val cmsg = CamelMessage.canonicalize(msg)
- val exchange = endpoint.createExchange(pattern)
- exchange.setRequest(cmsg)
- processor.process(exchange, new AsyncCallback {
- val producer = self
- // Need copies of sender reference here since the callback could be done
- // later by another thread.
- val originalSender = sender
+ val xchg = new CamelExchangeAdapter(endpoint.createExchange(pattern))
+
+ xchg.setRequest(cmsg)
+
+ processor.process(xchg.exchange, new AsyncCallback {
// Ignoring doneSync, sending back async uniformly.
def done(doneSync: Boolean): Unit = producer.tell(
- if (exchange.isFailed) exchange.toFailureResult(cmsg.headers(headersToCopy))
- else MessageResult(exchange.toResponseMessage(cmsg.headers(headersToCopy))), originalSender)
+ if (xchg.exchange.isFailed) xchg.toFailureResult(cmsg.headers(headersToCopy))
+ else MessageResult(xchg.toResponseMessage(cmsg.headers(headersToCopy))), originalSender)
})
}
@@ -94,9 +96,7 @@ trait ProducerSupport { this: Actor ⇒
val e = new AkkaCamelException(res.cause, res.headers)
routeResponse(Failure(e))
throw e
- case msg ⇒
- val exchangePattern = if (oneway) ExchangePattern.InOnly else ExchangePattern.InOut
- produce(transformOutgoingMessage(msg), exchangePattern)
+ case msg ⇒ produce(transformOutgoingMessage(msg), if (oneway) ExchangePattern.InOnly else ExchangePattern.InOut)
}
/**
@@ -134,7 +134,7 @@ trait Producer extends ProducerSupport { this: Actor ⇒
* Default implementation of Actor.receive. Any messages received by this actors
* will be produced to the endpoint specified by endpointUri.
*/
- def receive = produce
+ def receive: Actor.Receive = produce
}
/**
@@ -153,6 +153,6 @@ private case class FailureResult(cause: Throwable, headers: Map[String, Any] = M
* @author Martin Krasser
*/
trait Oneway extends Producer { this: Actor ⇒
- override def oneway = true
+ override def oneway: Boolean = true
}
diff --git a/akka-camel/src/main/scala/akka/camel/internal/ActivationMessage.scala b/akka-camel/src/main/scala/akka/camel/internal/ActivationMessage.scala
index b8c3f42a47..bdd915ff70 100644
--- a/akka-camel/src/main/scala/akka/camel/internal/ActivationMessage.scala
+++ b/akka-camel/src/main/scala/akka/camel/internal/ActivationMessage.scala
@@ -20,7 +20,7 @@ private[camel] abstract class ActivationMessage(val actor: ActorRef)
*
*/
private[camel] object ActivationMessage {
- def unapply(msg: ActivationMessage): Option[ActorRef] = Some(msg.actor)
+ def unapply(msg: ActivationMessage): Option[ActorRef] = Option(msg.actor)
}
/**
diff --git a/akka-camel/src/main/scala/akka/camel/internal/ActivationTracker.scala b/akka-camel/src/main/scala/akka/camel/internal/ActivationTracker.scala
index 0b93460be0..f5a87eff25 100644
--- a/akka-camel/src/main/scala/akka/camel/internal/ActivationTracker.scala
+++ b/akka-camel/src/main/scala/akka/camel/internal/ActivationTracker.scala
@@ -96,17 +96,15 @@ private[akka] final class ActivationTracker extends Actor with ActorLogging {
/**
* Subscribes self to messages of type ActivationMessage
*/
- override def preStart() {
- context.system.eventStream.subscribe(self, classOf[ActivationMessage])
- }
+ override def preStart(): Unit = context.system.eventStream.subscribe(self, classOf[ActivationMessage])
override def receive = {
case msg @ ActivationMessage(ref) ⇒
- val state = activations.getOrElseUpdate(ref, new ActivationStateMachine)
- (state.receive orElse logStateWarning(ref))(msg)
+ (activations.getOrElseUpdate(ref, new ActivationStateMachine).receive orElse logStateWarning(ref))(msg)
}
- private[this] def logStateWarning(actorRef: ActorRef): Receive = { case msg ⇒ log.warning("Message [{}] not expected in current state of actor [{}]", msg, actorRef) }
+ private[this] def logStateWarning(actorRef: ActorRef): Receive =
+ { case msg ⇒ log.warning("Message [{}] not expected in current state of actor [{}]", msg, actorRef) }
}
/**
diff --git a/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala b/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala
index 1f2d80e6df..5de9eb447d 100644
--- a/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala
+++ b/akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala
@@ -16,34 +16,34 @@ import akka.camel.{ FailureResult, AkkaCamelException, CamelMessage }
*
* @author Martin Krasser
*/
-private[camel] class CamelExchangeAdapter(exchange: Exchange) {
+private[camel] class CamelExchangeAdapter(val exchange: Exchange) {
/**
* Returns the exchange id
*/
- def getExchangeId = exchange.getExchangeId
+ def getExchangeId: String = exchange.getExchangeId
/**
* Returns if the exchange is out capable.
*/
- def isOutCapable = exchange.getPattern.isOutCapable
+ def isOutCapable: Boolean = exchange.getPattern.isOutCapable
/**
* Sets Exchange.getIn from the given CamelMessage object.
*/
- def setRequest(msg: CamelMessage) { msg.copyContentTo(request) }
+ def setRequest(msg: CamelMessage): Unit = msg.copyContentTo(request)
/**
* Depending on the exchange pattern, sets Exchange.getIn or Exchange.getOut from the given
* CamelMessage object. If the exchange is out-capable then the Exchange.getOut is set, otherwise
* Exchange.getIn.
*/
- def setResponse(msg: CamelMessage) { msg.copyContentTo(response) }
+ def setResponse(msg: CamelMessage): Unit = msg.copyContentTo(response)
/**
* Sets Exchange.getException from the given FailureResult message. Headers of the FailureResult message
* are ignored.
*/
- def setFailure(msg: FailureResult) { exchange.setException(msg.cause) }
+ def setFailure(msg: FailureResult): Unit = exchange.setException(msg.cause)
/**
* Creates an immutable CamelMessage object from Exchange.getIn so it can be used with Actors.
@@ -120,7 +120,7 @@ private[camel] class CamelExchangeAdapter(exchange: Exchange) {
*/
def toResponseMessage(headers: Map[String, Any]): CamelMessage = CamelMessage.from(response, headers)
- private def request = exchange.getIn
+ private def request: JCamelMessage = exchange.getIn
private def response: JCamelMessage = ExchangeHelper.getResultMessage(exchange)
diff --git a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala
index 1754bb0073..2ac35fdec2 100644
--- a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala
+++ b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala
@@ -2,12 +2,12 @@ package akka.camel.internal
import akka.actor.ActorSystem
import component.{ DurationTypeConverter, ActorComponent }
-import org.apache.camel.CamelContext
import org.apache.camel.impl.DefaultCamelContext
import scala.Predef._
import akka.event.Logging
import akka.camel.Camel
import akka.util.{ NonFatal, Duration }
+import org.apache.camel.{ ProducerTemplate, CamelContext }
/**
* For internal use only.
@@ -33,14 +33,14 @@ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel {
ctx
}
- lazy val template = context.createProducerTemplate()
+ lazy val template: ProducerTemplate = context.createProducerTemplate()
/**
* Starts camel and underlying camel context and template.
* Only the creator of Camel should start and stop it.
* @see akka.camel.DefaultCamel#stop()
*/
- def start = {
+ def start(): this.type = {
context.start()
try template.start() catch { case NonFatal(e) ⇒ context.stop(); throw e }
log.debug("Started CamelContext[{}] for ActorSystem[{}]", context.getName, system.name)
@@ -54,9 +54,9 @@ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel {
*
* @see akka.camel.DefaultCamel#start()
*/
- def shutdown() {
+ def shutdown(): Unit = {
try context.stop() finally {
- try { template.stop() } catch { case NonFatal(e) ⇒ log.debug("Swallowing non-fatal exception [{}] on stopping Camel producer template", e) }
+ try template.stop() catch { case NonFatal(e) ⇒ log.debug("Swallowing non-fatal exception [{}] on stopping Camel producer template", e) }
}
log.debug("Stopped CamelContext[{}] for ActorSystem[{}]", context.getName, system.name)
}
diff --git a/akka-camel/src/main/scala/akka/camel/internal/ProducerRegistry.scala b/akka-camel/src/main/scala/akka/camel/internal/ProducerRegistry.scala
index 03d130efe2..d338dbfdea 100644
--- a/akka-camel/src/main/scala/akka/camel/internal/ProducerRegistry.scala
+++ b/akka-camel/src/main/scala/akka/camel/internal/ProducerRegistry.scala
@@ -11,6 +11,8 @@ import akka.util.NonFatal
* Watches the end of life of Producers.
* Removes a Producer from the ProducerRegistry when it is Terminated,
* which in turn stops the SendProcessor.
+ *
+ * INTERNAL API
*/
private class ProducerWatcher(registry: ProducerRegistry) extends Actor {
override def receive = {
@@ -19,6 +21,9 @@ private class ProducerWatcher(registry: ProducerRegistry) extends Actor {
}
}
+/**
+ * INTERNAL API
+ */
private case class RegisterProducer(actorRef: ActorRef)
/**
@@ -27,14 +32,11 @@ private case class RegisterProducer(actorRef: ActorRef)
* Every Producer needs an Endpoint and a SendProcessor
* to produce messages over an Exchange.
*/
-private[camel] trait ProducerRegistry {
- this: Camel ⇒
+private[camel] trait ProducerRegistry { this: Camel ⇒
private val camelObjects = new ConcurrentHashMap[ActorRef, (Endpoint, SendProcessor)]()
- private val watcher = system.actorOf(Props(new ProducerWatcher(this)))
+ private val watcher = system.actorOf(Props(new ProducerWatcher(this))) //FIXME should this really be top level?
- private def registerWatch(actorRef: ActorRef) {
- watcher ! RegisterProducer(actorRef)
- }
+ private def registerWatch(actorRef: ActorRef): Unit = watcher ! RegisterProducer(actorRef)
/**
* For internal use only.
@@ -77,7 +79,7 @@ private[camel] trait ProducerRegistry {
case NonFatal(e) ⇒ {
system.eventStream.publish(EndpointFailedToActivate(actorRef, e))
// can't return null to the producer actor, so blow up actor in initialization.
- throw e
+ throw e //FIXME I'm not a huge fan of log-rethrow, either log or rethrow
}
}
}
diff --git a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
index 7ec5919dc9..a8d7a59b61 100644
--- a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
+++ b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
@@ -35,10 +35,8 @@ private[camel] class ActorComponent(camel: Camel) extends DefaultComponent {
/**
* @see org.apache.camel.Component
*/
- def createEndpoint(uri: String, remaining: String, parameters: JMap[String, Object]): ActorEndpoint = {
- val path = ActorEndpointPath.fromCamelPath(remaining)
- new ActorEndpoint(uri, this, path, camel)
- }
+ def createEndpoint(uri: String, remaining: String, parameters: JMap[String, Object]): ActorEndpoint =
+ new ActorEndpoint(uri, this, ActorEndpointPath.fromCamelPath(remaining), camel)
}
/**
@@ -92,7 +90,7 @@ private[camel] class ActorEndpoint(uri: String,
private[camel] trait ActorEndpointConfig {
def path: ActorEndpointPath
- @BeanProperty var replyTimeout: Duration = 1 minute
+ @BeanProperty var replyTimeout: Duration = 1 minute // FIXME default should be in config, not code
/**
* Whether to auto-acknowledge one-way message exchanges with (untyped) actors. This is
@@ -117,7 +115,7 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
* Calls the asynchronous version of the method and waits for the result (blocking).
* @param exchange the exchange to process
*/
- def process(exchange: Exchange) { processExchangeAdapter(new CamelExchangeAdapter(exchange)) }
+ def process(exchange: Exchange): Unit = processExchangeAdapter(new CamelExchangeAdapter(exchange))
/**
* Processes the message exchange. the caller supports having the exchange asynchronously processed.
@@ -129,13 +127,15 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
* The callback should therefore be careful of starting recursive loop.
* @return (doneSync) true to continue execute synchronously, false to continue being executed asynchronously
*/
- def process(exchange: Exchange, callback: AsyncCallback): Boolean = { processExchangeAdapter(new CamelExchangeAdapter(exchange), callback) }
+ def process(exchange: Exchange, callback: AsyncCallback): Boolean = processExchangeAdapter(new CamelExchangeAdapter(exchange), callback)
/**
* For internal use only. Processes the [[akka.camel.internal.CamelExchangeAdapter]]
* @param exchange the [[akka.camel.internal.CamelExchangeAdapter]]
+ *
+ * WARNING UNBOUNDED BLOCKING AWAITS
*/
- private[camel] def processExchangeAdapter(exchange: CamelExchangeAdapter) {
+ private[camel] def processExchangeAdapter(exchange: CamelExchangeAdapter): Unit = {
val isDone = new CountDownLatch(1)
processExchangeAdapter(exchange, new AsyncCallback { def done(doneSync: Boolean) { isDone.countDown() } })
isDone.await() // this should never wait forever as the process(exchange, callback) method guarantees that.
@@ -151,10 +151,10 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
private[camel] def processExchangeAdapter(exchange: CamelExchangeAdapter, callback: AsyncCallback): Boolean = {
// these notify methods are just a syntax sugar
- def notifyDoneSynchronously[A](a: A = null) = callback.done(true)
- def notifyDoneAsynchronously[A](a: A = null) = callback.done(false)
+ def notifyDoneSynchronously[A](a: A = null): Unit = callback.done(true)
+ def notifyDoneAsynchronously[A](a: A = null): Unit = callback.done(false)
- def message = messageFor(exchange)
+ def message: CamelMessage = messageFor(exchange)
if (exchange.isOutCapable) { //InOut
sendAsync(message, onComplete = forwardResponseTo(exchange) andThen notifyDoneAsynchronously)
@@ -186,39 +186,29 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
private def sendAsync(message: CamelMessage, onComplete: PartialFunction[Either[Throwable, Any], Unit]): Boolean = {
try {
- val actor = actorFor(endpoint.path)
- val future = actor.ask(message)(new Timeout(endpoint.replyTimeout))
- future.onComplete(onComplete)
+ actorFor(endpoint.path).ask(message)(Timeout(endpoint.replyTimeout)).onComplete(onComplete)
} catch {
case NonFatal(e) ⇒ onComplete(Left(e))
}
false // Done async
}
- private def fireAndForget(message: CamelMessage, exchange: CamelExchangeAdapter) {
- try {
- actorFor(endpoint.path) ! message
- } catch {
- case e ⇒ exchange.setFailure(new FailureResult(e))
- }
- }
+ private def fireAndForget(message: CamelMessage, exchange: CamelExchangeAdapter): Unit =
+ try { actorFor(endpoint.path) ! message } catch { case NonFatal(e) ⇒ exchange.setFailure(new FailureResult(e)) }
private[this] def actorFor(path: ActorEndpointPath): ActorRef =
path.findActorIn(camel.system) getOrElse (throw new ActorNotRegisteredException(path.actorPath))
private[this] def messageFor(exchange: CamelExchangeAdapter) =
exchange.toRequestMessage(Map(CamelMessage.MessageExchangeId -> exchange.getExchangeId))
-
}
/**
* For internal use only. Converts Strings to [[akka.util.Duration]]s
*/
private[camel] object DurationTypeConverter extends TypeConverter {
- def convertTo[T](`type`: Class[T], value: AnyRef) = {
- Duration(value.toString).asInstanceOf[T]
- }
- def convertTo[T](`type`: Class[T], exchange: Exchange, value: AnyRef) = convertTo(`type`, value)
+ def convertTo[T](`type`: Class[T], value: AnyRef) = Duration(value.toString).asInstanceOf[T] //FIXME WTF
+ def convertTo[T](`type`: Class[T], exchange: Exchange, value: AnyRef): T = convertTo(`type`, value)
def mandatoryConvertTo[T](`type`: Class[T], value: AnyRef) = convertTo(`type`, value)
def mandatoryConvertTo[T](`type`: Class[T], exchange: Exchange, value: AnyRef) = convertTo(`type`, value)
def toString(duration: Duration) = duration.toNanos + " nanos"
@@ -243,15 +233,15 @@ private[camel] case class ActorEndpointPath private (actorPath: String) {
* For internal use only. Companion of `ActorEndpointPath`
*/
private[camel] object ActorEndpointPath {
- private val consumerConfig = new ConsumerConfig {}
+ private val consumerConfig: ConsumerConfig = new ConsumerConfig {}
- def apply(actorRef: ActorRef) = new ActorEndpointPath(actorRef.path.toString)
+ def apply(actorRef: ActorRef): ActorEndpointPath = new ActorEndpointPath(actorRef.path.toString)
/**
* Creates an [[akka.camel.internal.component.ActorEndpointPath]] from the remaining part of the endpoint URI (the part after the scheme, without the parameters of the URI).
* Expects the remaining part of the URI (the actor path) in a format: path:%s
*/
- def fromCamelPath(camelPath: String) = camelPath match {
+ def fromCamelPath(camelPath: String): ActorEndpointPath = camelPath match {
case id if id startsWith "path:" ⇒ new ActorEndpointPath(id substring 5)
case _ ⇒ throw new IllegalArgumentException("Invalid path: [%s] - should be path:CamelContext.
diff --git a/akka-camel/src/main/scala/akka/package.scala b/akka-camel/src/main/scala/akka/package.scala
index 436d2fc1b3..10382d96ee 100644
--- a/akka-camel/src/main/scala/akka/package.scala
+++ b/akka-camel/src/main/scala/akka/package.scala
@@ -7,5 +7,6 @@ package akka
import org.apache.camel.model.ProcessorDefinition
package object camel {
+ //TODO Why do I exist?
implicit def toActorRouteDefinition(definition: ProcessorDefinition[_]) = new ActorRouteDefinition(definition)
}
\ No newline at end of file