diff --git a/akka-camel/src/main/scala/Consumer.scala b/akka-camel/src/main/scala/Consumer.scala
index caafca4628..e5218a21f2 100644
--- a/akka-camel/src/main/scala/Consumer.scala
+++ b/akka-camel/src/main/scala/Consumer.scala
@@ -4,7 +4,7 @@
package se.scalablesolutions.akka.camel
-import se.scalablesolutions.akka.actor.Actor
+import se.scalablesolutions.akka.actor.{ActorRef, Actor}
/**
* Mixed in by Actor implementations that consume message from Camel endpoints.
@@ -12,9 +12,34 @@ import se.scalablesolutions.akka.actor.Actor
* @author Martin Krasser
*/
trait Consumer { self: Actor =>
-
/**
* Returns the Camel endpoint URI to consume messages from.
*/
def endpointUri: String
+
+ /**
+ * Determines whether two-way communications with this consumer actor should
+ * be done in blocking or non-blocking mode (default is non-blocking). One-way
+ * communications never block.
+ */
+ def blocking = false
}
+
+/**
+ * @author Martin Krasser
+ */
+private[camel] object Consumer {
+ /**
+ * Applies a function f to actorRef if actorRef
+ * references a consumer actor. A valid reference to a consumer actor is a local actor
+ * reference with a target actor that implements the Consumer trait. The
+ * target Consumer object is passed as argument to f. This
+ * method returns None if actorRef is not a valid reference
+ * to a consumer actor, Some result otherwise.
+ */
+ def forConsumer[T](actorRef: ActorRef)(f: Consumer => T): Option[T] = {
+ if (!actorRef.actor.isInstanceOf[Consumer]) None
+ else if (actorRef.remoteAddress.isDefined) None
+ else Some(f(actorRef.actor.asInstanceOf[Consumer]))
+ }
+}
\ No newline at end of file
diff --git a/akka-camel/src/main/scala/ConsumerPublisher.scala b/akka-camel/src/main/scala/ConsumerPublisher.scala
index 8d29739f02..298c70c2b7 100644
--- a/akka-camel/src/main/scala/ConsumerPublisher.scala
+++ b/akka-camel/src/main/scala/ConsumerPublisher.scala
@@ -24,7 +24,7 @@ private[camel] object ConsumerPublisher extends Logging {
* Creates a route to the registered consumer actor.
*/
def handleConsumerRegistered(event: ConsumerRegistered) {
- CamelContextManager.context.addRoutes(new ConsumerActorRoute(event.uri, event.id, event.uuid))
+ CamelContextManager.context.addRoutes(new ConsumerActorRoute(event.uri, event.uuid, event.blocking))
log.info("published actor %s at endpoint %s" format (event.actorRef, event.uri))
}
@@ -32,7 +32,7 @@ private[camel] object ConsumerPublisher extends Logging {
* Stops route to the already un-registered consumer actor.
*/
def handleConsumerUnregistered(event: ConsumerUnregistered) {
- CamelContextManager.context.stopRoute(event.id)
+ CamelContextManager.context.stopRoute(event.uuid)
log.info("unpublished actor %s from endpoint %s" format (event.actorRef, event.uri))
}
@@ -139,14 +139,13 @@ private[camel] abstract class ConsumerRoute(endpointUri: String, id: String) ext
* Defines the route to a consumer actor.
*
* @param endpointUri endpoint URI of the consumer actor
- * @param id actor identifier
- * @param uuid true if id refers to Actor.uuid, false if
- * id refers to Actor.getId.
+ * @param uuid actor uuid
+ * @param blocking true for blocking in-out exchanges, false otherwise
*
* @author Martin Krasser
*/
-private[camel] class ConsumerActorRoute(endpointUri: String, id: String, uuid: Boolean) extends ConsumerRoute(endpointUri, id) {
- protected override def targetUri = (if (uuid) "actor:uuid:%s" else "actor:id:%s") format id
+private[camel] class ConsumerActorRoute(endpointUri: String, uuid: String, blocking: Boolean) extends ConsumerRoute(endpointUri, uuid) {
+ protected override def targetUri = "actor:uuid:%s?blocking=%s" format (uuid, blocking)
}
/**
@@ -226,26 +225,23 @@ private[camel] sealed trait ConsumerEvent
*
* @param actorRef actor reference
* @param uri endpoint URI of the consumer actor
- * @param id actor identifier
- * @param uuid true if id is the actor's uuid, false if
- * id is the actor's id.
+ * @param uuid actor uuid
+ * @param blocking true for blocking in-out exchanges, false otherwise
*
* @author Martin Krasser
*/
-private[camel] case class ConsumerRegistered(actorRef: ActorRef, uri: String, id: String, uuid: Boolean) extends ConsumerEvent
+private[camel] case class ConsumerRegistered(actorRef: ActorRef, uri: String, uuid: String, blocking: Boolean) extends ConsumerEvent
/**
* Event indicating that a consumer actor has been unregistered from the actor registry.
*
* @param actorRef actor reference
* @param uri endpoint URI of the consumer actor
- * @param id actor identifier
- * @param uuid true if id is the actor's uuid, false if
- * id is the actor's id.
+ * @param uuid actor uuid
*
* @author Martin Krasser
*/
-private[camel] case class ConsumerUnregistered(actorRef: ActorRef, uri: String, id: String, uuid: Boolean) extends ConsumerEvent
+private[camel] case class ConsumerUnregistered(actorRef: ActorRef, uri: String, uuid: String) extends ConsumerEvent
/**
* Event indicating that an active object proxy has been created for a POJO. For each
@@ -283,9 +279,10 @@ private[camel] object ConsumerRegistered {
* Optionally creates an ConsumerRegistered event message for a consumer actor or None if
* actorRef is not a consumer actor.
*/
- def forConsumer(actorRef: ActorRef): Option[ConsumerRegistered] = actorRef match {
- case ConsumerDescriptor(ref, uri, id, uuid) => Some(ConsumerRegistered(ref, uri, id, uuid))
- case _ => None
+ def forConsumer(actorRef: ActorRef): Option[ConsumerRegistered] = {
+ Consumer.forConsumer[ConsumerRegistered](actorRef) {
+ target => ConsumerRegistered(actorRef, target.endpointUri, actorRef.uuid, target.blocking)
+ }
}
}
@@ -297,9 +294,10 @@ private[camel] object ConsumerUnregistered {
* Optionally creates an ConsumerUnregistered event message for a consumer actor or None if
* actorRef is not a consumer actor.
*/
- def forConsumer(actorRef: ActorRef): Option[ConsumerUnregistered] = actorRef match {
- case ConsumerDescriptor(ref, uri, id, uuid) => Some(ConsumerUnregistered(ref, uri, id, uuid))
- case _ => None
+ def forConsumer(actorRef: ActorRef): Option[ConsumerUnregistered] = {
+ Consumer.forConsumer[ConsumerUnregistered](actorRef) {
+ target => ConsumerUnregistered(actorRef, target.endpointUri, actorRef.uuid)
+ }
}
}
@@ -333,12 +331,15 @@ private[camel] object ConsumerMethodRegistered {
* have any @consume annotated methods.
*/
def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodRegistered] = {
- ConsumerMethod.forConsumer[ConsumerMethodRegistered](activeObject, init) {
+ ConsumerMethod.forConsumer(activeObject, init) {
m => ConsumerMethodRegistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
}
}
}
+/**
+ * @author Martin Krasser
+ */
private[camel] object ConsumerMethodUnregistered {
/**
* Creates a list of ConsumerMethodUnregistered event messages for an active object or an empty
@@ -346,38 +347,8 @@ private[camel] object ConsumerMethodUnregistered {
* have any @consume annotated methods.
*/
def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodUnregistered] = {
- ConsumerMethod.forConsumer[ConsumerMethodUnregistered](activeObject, init) {
+ ConsumerMethod.forConsumer(activeObject, init) {
m => ConsumerMethodUnregistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
}
}
}
-
-/**
- * Describes a consumer actor with elements that are relevant for publishing an actor at a
- * Camel endpoint (or unpublishing an actor from an endpoint).
- *
- * @author Martin Krasser
- */
-private[camel] object ConsumerDescriptor {
-
- /**
- * An extractor that optionally creates a 4-tuple from a consumer actor reference containing
- * the actor reference itself, endpoint URI, identifier and a hint whether the identifier
- * is the actor uuid or actor id. If actorRef doesn't reference a consumer actor,
- * None is returned.
- */
- def unapply(actorRef: ActorRef): Option[(ActorRef, String, String, Boolean)] =
- unapplyConsumerInstance(actorRef) orElse unapplyConsumeAnnotated(actorRef)
-
- private def unapplyConsumeAnnotated(actorRef: ActorRef): Option[(ActorRef, String, String, Boolean)] = {
- val annotation = actorRef.actorClass.getAnnotation(classOf[consume])
- if (annotation eq null) None
- else if (actorRef.remoteAddress.isDefined) None
- else Some((actorRef, annotation.value, actorRef.id, false))
- }
-
- private def unapplyConsumerInstance(actorRef: ActorRef): Option[(ActorRef, String, String, Boolean)] =
- if (!actorRef.actor.isInstanceOf[Consumer]) None
- else if (actorRef.remoteAddress.isDefined) None
- else Some((actorRef, actorRef.actor.asInstanceOf[Consumer].endpointUri, actorRef.uuid, true))
-}
diff --git a/akka-camel/src/main/scala/component/ActorComponent.scala b/akka-camel/src/main/scala/component/ActorComponent.scala
index e80595515e..696588fcec 100644
--- a/akka-camel/src/main/scala/component/ActorComponent.scala
+++ b/akka-camel/src/main/scala/component/ActorComponent.scala
@@ -4,15 +4,24 @@
package se.scalablesolutions.akka.camel.component
-import java.lang.{RuntimeException, String}
+import java.net.InetSocketAddress
import java.util.{Map => JavaMap}
import java.util.concurrent.TimeoutException
+import java.util.concurrent.atomic.AtomicReference
-import org.apache.camel.{Exchange, Consumer, Processor}
+import jsr166x.Deque
+
+import org.apache.camel._
import org.apache.camel.impl.{DefaultProducer, DefaultEndpoint, DefaultComponent}
import se.scalablesolutions.akka.actor.{ActorRegistry, Actor, ActorRef}
import se.scalablesolutions.akka.camel.{Failure, CamelMessageConversion, Message}
+import se.scalablesolutions.akka.dispatch.{CompletableFuture, MessageInvocation, MessageDispatcher}
+import se.scalablesolutions.akka.stm.TransactionConfig
+
+import scala.reflect.BeanProperty
+
+import CamelMessageConversion.toExchangeAdapter
/**
* Camel component for sending messages to and receiving replies from actors.
@@ -41,7 +50,7 @@ class ActorComponent extends DefaultComponent {
/**
* Camel endpoint for referencing an actor. The actor reference is given by the endpoint URI.
- * An actor can be referenced by its Actor.getId or its Actor.uuid.
+ * An actor can be referenced by its ActorRef.id or its ActorRef.uuid.
* Supported endpoint URI formats are
* actor:<actorid>,
* actor:id:<actorid> and
@@ -57,6 +66,12 @@ class ActorEndpoint(uri: String,
val id: Option[String],
val uuid: Option[String]) extends DefaultEndpoint(uri, comp) {
+ /**
+ * Blocking of client thread during two-way message exchanges with consumer actors. This is set
+ * via the blocking=true|false endpoint URI parameter. If omitted blocking is false.
+ */
+ @BeanProperty var blocking: Boolean = false
+
/**
* @throws UnsupportedOperationException
*/
@@ -75,48 +90,43 @@ class ActorEndpoint(uri: String,
}
/**
- * Sends the in-message of an exchange to an actor. If the exchange pattern is out-capable,
- * the producer waits for a reply (using the !! operator), otherwise the ! operator is used
- * for sending the message.
+ * Sends the in-message of an exchange to an actor. If the exchange pattern is out-capable and
+ * blocking is enabled then the producer waits for a reply (using the !! operator),
+ * otherwise the ! operator is used for sending the message.
*
* @see se.scalablesolutions.akka.camel.component.ActorComponent
* @see se.scalablesolutions.akka.camel.component.ActorEndpoint
*
* @author Martin Krasser
*/
-class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
- import CamelMessageConversion.toExchangeAdapter
+class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) with AsyncProcessor {
+ import ActorProducer._
- implicit val sender = None
+ def process(exchange: Exchange) =
+ if (exchange.getPattern.isOutCapable) sendSync(exchange) else sendAsync(exchange)
- /**
- * Depending on the exchange pattern, this method either calls processInOut or
- * processInOnly for interacting with an actor. This methods looks up the actor
- * from the ActorRegistry according to this producer's endpoint URI.
- *
- * @param exchange represents the message exchange with the actor.
- */
- def process(exchange: Exchange) {
- val actor = target getOrElse (throw new ActorNotRegisteredException(ep.getEndpointUri))
- if (exchange.getPattern.isOutCapable) processInOut(exchange, actor)
- else processInOnly(exchange, actor)
+ def process(exchange: Exchange, callback: AsyncCallback): Boolean = {
+ (exchange.getPattern.isOutCapable, ep.blocking) match {
+ case (true, true) => {
+ sendSync(exchange)
+ callback.done(true)
+ true
+ }
+ case (true, false) => {
+ sendAsync(exchange, Some(AsyncCallbackAdapter(exchange, callback)))
+ false
+ }
+ case (false, _) => {
+ sendAsync(exchange)
+ callback.done(true)
+ true
+ }
+ }
}
- /**
- * Send the exchange in-message to the given actor using the ! operator. The message
- * send to the actor is of type se.scalablesolutions.akka.camel.Message.
- */
- protected def processInOnly(exchange: Exchange, actor: ActorRef): Unit =
- actor ! exchange.toRequestMessage(Map(Message.MessageExchangeId -> exchange.getExchangeId))
-
- /**
- * Send the exchange in-message to the given actor using the !! operator. The exchange
- * out-message is populated from the actor's reply message. The message sent to the
- * actor is of type se.scalablesolutions.akka.camel.Message.
- */
- protected def processInOut(exchange: Exchange, actor: ActorRef) {
- val header = Map(Message.MessageExchangeId -> exchange.getExchangeId)
- val result: Any = actor !! exchange.toRequestMessage(header)
+ private def sendSync(exchange: Exchange) = {
+ val actor = target
+ val result: Any = actor !! requestFor(exchange)
result match {
case Some(msg: Failure) => exchange.fromFailureMessage(msg)
@@ -128,7 +138,13 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
}
}
- private def target: Option[ActorRef] =
+ private def sendAsync(exchange: Exchange, sender: Option[ActorRef] = None) =
+ target.!(requestFor(exchange))(sender)
+
+ private def target =
+ targetOption getOrElse (throw new ActorNotRegisteredException(ep.getEndpointUri))
+
+ private def targetOption: Option[ActorRef] =
if (ep.id.isDefined) targetById(ep.id.get)
else targetByUuid(ep.uuid.get)
@@ -141,6 +157,14 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
private def targetByUuid(uuid: String) = ActorRegistry.actorFor(uuid)
}
+/**
+ * @author Martin Krasser
+ */
+private[camel] object ActorProducer {
+ def requestFor(exchange: Exchange) =
+ exchange.toRequestMessage(Map(Message.MessageExchangeId -> exchange.getExchangeId))
+}
+
/**
* Thrown to indicate that an actor referenced by an endpoint URI cannot be
* found in the ActorRegistry.
@@ -150,3 +174,92 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
class ActorNotRegisteredException(uri: String) extends RuntimeException {
override def getMessage = "%s not registered" format uri
}
+
+/**
+ * @author Martin Krasser
+ */
+private[akka] object AsyncCallbackAdapter {
+ /**
+ * Creates and starts an AsyncCallbackAdapter.
+ *
+ * @param exchange message exchange to write results to.
+ * @param callback callback object to generate completion notifications.
+ */
+ def apply(exchange: Exchange, callback: AsyncCallback) =
+ new AsyncCallbackAdapter(exchange, callback).start
+}
+
+/**
+ * Adapts an AsyncCallback to ActorRef.!. Used by other actors to reply
+ * asynchronously to Camel with ActorRef.reply.
+ *
+ * Please note that this adapter can only be used locally at the moment which should not
+ * be a problem is most situations as Camel endpoints are only activated for local actor references,
+ * never for remote references.
+ *
+ * @author Martin Krasser
+ */
+private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCallback) extends ActorRef {
+
+ def start = {
+ _isRunning = true
+ this
+ }
+
+ def stop() = {
+ _isRunning = false
+ _isShutDown = true
+ }
+
+ /**
+ * Writes the reply message to exchange and uses callback to
+ * generate completion notifications.
+ *
+ * @param message reply message
+ * @param sender ignored
+ */
+ protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) = {
+ message match {
+ case msg: Failure => exchange.fromFailureMessage(msg)
+ case msg => exchange.fromResponseMessage(Message.canonicalize(msg))
+ }
+ callback.done(false)
+ }
+
+ def actorClass: Class[_ <: Actor] = unsupported
+ def actorClassName = unsupported
+ def dispatcher_=(md: MessageDispatcher): Unit = unsupported
+ def dispatcher: MessageDispatcher = unsupported
+ def transactionConfig_=(config: TransactionConfig): Unit = unsupported
+ def transactionConfig: TransactionConfig = unsupported
+ def makeTransactionRequired: Unit = unsupported
+ def makeRemote(hostname: String, port: Int): Unit = unsupported
+ def makeRemote(address: InetSocketAddress): Unit = unsupported
+ def homeAddress_=(address: InetSocketAddress): Unit = unsupported
+ def remoteAddress: Option[InetSocketAddress] = unsupported
+ def link(actorRef: ActorRef): Unit = unsupported
+ def unlink(actorRef: ActorRef): Unit = unsupported
+ def startLink(actorRef: ActorRef): Unit = unsupported
+ def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int): Unit = unsupported
+ def spawn[T <: Actor : Manifest]: ActorRef = unsupported
+ def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = unsupported
+ def spawnLink[T <: Actor: Manifest]: ActorRef = unsupported
+ def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = unsupported
+ def shutdownLinkedActors: Unit = unsupported
+ def mailboxSize: Int = unsupported
+ def supervisor: Option[ActorRef] = unsupported
+ protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](message: Any, timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]) = unsupported
+ protected[akka] def mailbox: Deque[MessageInvocation] = unsupported
+ protected[akka] def restart(reason: Throwable): Unit = unsupported
+ protected[akka] def restartLinkedActors(reason: Throwable): Unit = unsupported
+ protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported
+ protected[akka] def linkedActors: JavaMap[String, ActorRef] = unsupported
+ protected[akka] def linkedActorsAsList: List[ActorRef] = unsupported
+ protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported
+ protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit = unsupported
+ protected[akka] def registerSupervisorAsRemoteActor = unsupported
+ protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = unsupported
+ protected[this] def actorInstance: AtomicReference[Actor] = unsupported
+
+ private def unsupported = throw new UnsupportedOperationException("Not supported for %s" format classOf[AsyncCallbackAdapter].getName)
+}
\ No newline at end of file
diff --git a/akka-camel/src/test/scala/CamelServiceFeatureTest.scala b/akka-camel/src/test/scala/CamelServiceFeatureTest.scala
index 1e88b62bf2..a1989a0b4b 100644
--- a/akka-camel/src/test/scala/CamelServiceFeatureTest.scala
+++ b/akka-camel/src/test/scala/CamelServiceFeatureTest.scala
@@ -2,6 +2,7 @@ package se.scalablesolutions.akka.camel
import java.util.concurrent.{CountDownLatch, TimeUnit}
+import org.apache.camel.CamelExecutionException
import org.apache.camel.builder.RouteBuilder
import org.scalatest.{GivenWhenThen, BeforeAndAfterAll, FeatureSpec}
@@ -62,24 +63,22 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
scenario("access to unregistered consumer actor via Camel direct-endpoint fails") {
val endpointUri = "direct:unpublish-test-1"
- given("a consumer actor that has been stopped")
+ given("a consumer actor registered after CamelService startup")
assert(CamelContextManager.context.hasEndpoint(endpointUri) eq null)
var latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get
val consumer = actorOf(new TestConsumer(endpointUri)).start
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null)
+ when("the actor is stopped")
latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get
consumer.stop
assert(latch.await(5000, TimeUnit.MILLISECONDS))
- // endpoint is still there but the route has been stopped
- assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null)
- when("a request is sent to this actor")
- val response1 = CamelContextManager.template.requestBody(endpointUri, "msg1")
-
- then("the direct-endpoint falls back to its default behaviour and returns the original message")
- assert(response1 === "msg1")
+ then("the associated endpoint isn't accessible any more")
+ intercept[CamelExecutionException] {
+ CamelContextManager.template.requestBody(endpointUri, "msg1")
+ }
}
}
@@ -128,24 +127,26 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
scenario("access to unregistered active object methof via Camel direct-endpoint fails") {
- given("an active object that has been stopped")
+ given("an active object registered after CamelService startup")
var latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
val obj = ActiveObject.newInstance(classOf[PojoBase])
assert(latch.await(5000, TimeUnit.MILLISECONDS))
+ when("the active object is stopped")
latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
ActiveObject.stop(obj)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
- when("requests are sent to published methods")
- val response1 = CamelContextManager.template.requestBodyAndHeader("direct:m2base", "x", "test", "y")
- val response2 = CamelContextManager.template.requestBodyAndHeader("direct:m3base", "x", "test", "y")
- val response3 = CamelContextManager.template.requestBodyAndHeader("direct:m4base", "x", "test", "y")
-
- then("the direct-endpoints fall back to their default behaviour and return the original message")
- assert(response1 === "x")
- assert(response2 === "x")
- assert(response3 === "x")
+ then("the associated endpoints aren't accessible any more")
+ intercept[CamelExecutionException] {
+ CamelContextManager.template.requestBodyAndHeader("direct:m2base", "x", "test", "y")
+ }
+ intercept[CamelExecutionException] {
+ CamelContextManager.template.requestBodyAndHeader("direct:m3base", "x", "test", "y")
+ }
+ intercept[CamelExecutionException] {
+ CamelContextManager.template.requestBodyAndHeader("direct:m4base", "x", "test", "y")
+ }
}
}
}
diff --git a/akka-camel/src/test/scala/ConsumerRegisteredTest.scala b/akka-camel/src/test/scala/ConsumerRegisteredTest.scala
index caaa03591b..3339caacf2 100644
--- a/akka-camel/src/test/scala/ConsumerRegisteredTest.scala
+++ b/akka-camel/src/test/scala/ConsumerRegisteredTest.scala
@@ -5,17 +5,16 @@ import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.Actor._
-import se.scalablesolutions.akka.actor.annotation.consume
object ConsumerRegisteredTest {
- @consume("mock:test1")
- class ConsumeAnnotatedActor extends Actor {
- self.id = "test"
+ class ConsumerActor1 extends Actor with Consumer {
+ def endpointUri = "mock:test1"
protected def receive = null
}
- class ConsumerActor extends Actor with Consumer {
+ class ConsumerActor2 extends Actor with Consumer {
def endpointUri = "mock:test2"
+ override def blocking = true
protected def receive = null
}
@@ -27,21 +26,14 @@ object ConsumerRegisteredTest {
class ConsumerRegisteredTest extends JUnitSuite {
import ConsumerRegisteredTest._
- @Test def shouldCreatePublishRequestList = {
- val a = actorOf[ConsumeAnnotatedActor]
- val as = List(a)
- val events = for (a <- as; e <- ConsumerRegistered.forConsumer(a)) yield e
- assert(events === List(ConsumerRegistered(a, "mock:test1", "test", false)))
+ @Test def shouldCreateSomeNonBlockingPublishRequest = {
+ val ca = actorOf[ConsumerActor1]
+ val event = ConsumerRegistered.forConsumer(ca)
+ assert(event === Some(ConsumerRegistered(ca, "mock:test1", ca.uuid, false)))
}
- @Test def shouldCreateSomePublishRequestWithActorId = {
- val a = actorOf[ConsumeAnnotatedActor]
- val event = ConsumerRegistered.forConsumer(a)
- assert(event === Some(ConsumerRegistered(a, "mock:test1", "test", false)))
- }
-
- @Test def shouldCreateSomePublishRequestWithActorUuid = {
- val ca = actorOf[ConsumerActor]
+ @Test def shouldCreateSomeBlockingPublishRequest = {
+ val ca = actorOf[ConsumerActor2]
val event = ConsumerRegistered.forConsumer(ca)
assert(event === Some(ConsumerRegistered(ca, "mock:test2", ca.uuid, true)))
}
diff --git a/akka-camel/src/test/scala/ProducerFeatureTest.scala b/akka-camel/src/test/scala/ProducerFeatureTest.scala
index 96d1b9eeef..af6f4e73eb 100644
--- a/akka-camel/src/test/scala/ProducerFeatureTest.scala
+++ b/akka-camel/src/test/scala/ProducerFeatureTest.scala
@@ -8,12 +8,6 @@ import org.scalatest.{GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, Feat
import se.scalablesolutions.akka.actor.{Actor, ActorRegistry}
import se.scalablesolutions.akka.actor.Actor._
-object ProducerFeatureTest {
- class TestProducer(uri: String) extends Actor with Producer {
- def endpointUri = uri
- }
-}
-
class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach with GivenWhenThen {
import ProducerFeatureTest._
@@ -121,6 +115,12 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
}
private def mockEndpoint = CamelContextManager.context.getEndpoint("mock:mock", classOf[MockEndpoint])
+}
+
+object ProducerFeatureTest {
+ class TestProducer(uri: String) extends Actor with Producer {
+ def endpointUri = uri
+ }
class TestRoute extends RouteBuilder {
def configure {
@@ -137,4 +137,4 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
})
}
}
-}
+}
\ No newline at end of file
diff --git a/akka-camel/src/test/scala/PublishRequestorTest.scala b/akka-camel/src/test/scala/PublishRequestorTest.scala
index 44c6c30684..131f4fe2b5 100644
--- a/akka-camel/src/test/scala/PublishRequestorTest.scala
+++ b/akka-camel/src/test/scala/PublishRequestorTest.scala
@@ -62,7 +62,7 @@ class PublishRequestorTest extends JUnitSuite {
requestor ! ActorRegistered(consumer)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher !! GetRetainedMessage) ===
- Some(ConsumerRegistered(consumer, "mock:test", consumer.uuid, true)))
+ Some(ConsumerRegistered(consumer, "mock:test", consumer.uuid, false)))
}
@Test def shouldReceiveConsumerUnregisteredEvent = {
@@ -70,7 +70,7 @@ class PublishRequestorTest extends JUnitSuite {
requestor ! ActorUnregistered(consumer)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher !! GetRetainedMessage) ===
- Some(ConsumerUnregistered(consumer, "mock:test", consumer.uuid, true)))
+ Some(ConsumerUnregistered(consumer, "mock:test", consumer.uuid)))
}
}
diff --git a/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala b/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala
index b7fd607f28..7d1482c36c 100644
--- a/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala
+++ b/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala
@@ -55,7 +55,7 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with
scenario("two-way communication with timeout") {
val actor = actorOf[Tester3].start
intercept[RuntimeCamelException] {
- template.requestBody("actor:uuid:%s" format actor.uuid, "Martin")
+ template.requestBody("actor:uuid:%s?blocking=true" format actor.uuid, "Martin")
}
}
}
diff --git a/akka-core/src/main/java/se/scalablesolutions/akka/annotation/consume.java b/akka-core/src/main/java/se/scalablesolutions/akka/annotation/consume.java
index 1a8e5c8db6..34d42debab 100644
--- a/akka-core/src/main/java/se/scalablesolutions/akka/annotation/consume.java
+++ b/akka-core/src/main/java/se/scalablesolutions/akka/annotation/consume.java
@@ -10,7 +10,7 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
-@Target({ElementType.TYPE, ElementType.METHOD})
+@Target({ElementType.METHOD})
public @interface consume {
public abstract String value();
diff --git a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala
index 7ab8b0dae5..b3e2a9cec5 100644
--- a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala
+++ b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala
@@ -1,6 +1,5 @@
package sample.camel
-import se.scalablesolutions.akka.actor.annotation.consume
import se.scalablesolutions.akka.actor.{Actor, ActorRef, RemoteActor}
import se.scalablesolutions.akka.camel.{Producer, Message, Consumer}
import se.scalablesolutions.akka.util.Logging
@@ -42,8 +41,9 @@ class Consumer1 extends Actor with Consumer with Logging {
}
}
-@consume("jetty:http://0.0.0.0:8877/camel/default")
class Consumer2 extends Actor {
+ def endpointUri = "jetty:http://0.0.0.0:8877/camel/default"
+
def receive = {
case msg: Message => self.reply("Hello %s" format msg.bodyAs[String])
}
diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala
index d41f453260..a74cc2f7f8 100644
--- a/project/build/AkkaProject.scala
+++ b/project/build/AkkaProject.scala
@@ -18,6 +18,8 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
// project versions
val JERSEY_VERSION = "1.2"
val ATMO_VERSION = "0.6"
+ val CAMEL_VERSION = "2.4.0"
+ val SPRING_VERSION = "3.0.3.RELEASE"
val CASSANDRA_VERSION = "0.6.1"
val LIFT_VERSION = "2.0-scala280-SNAPSHOT"
val SCALATEST_VERSION = "1.2-for-scala-2.8.0.RC3-SNAPSHOT"
@@ -69,6 +71,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots)
def codehausSnapshotRepo = "Codehaus Snapshots" at "http://snapshots.repository.codehaus.org"
val multiverseModuleConfig = ModuleConfiguration("org.multiverse", codehausSnapshotRepo)
+ val apacheSnapshotRepo = "Camel 2.4 Staging" at "https://repository.apache.org/content/repositories/orgapachecamel-053/"
// ------------------------------------------------------------
// project defintions
@@ -239,7 +242,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
}
class AkkaCamelProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with CodeFellowPlugin {
- val camel_core = "org.apache.camel" % "camel-core" % "2.3.0" % "compile"
+ val camel_core = "org.apache.camel" % "camel-core" % CAMEL_VERSION % "compile" withSources()
}
class AkkaPersistenceCommonProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
@@ -285,13 +288,11 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
class AkkaKernelProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath)
class AkkaSpringProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with CodeFellowPlugin {
- val spring_beans = "org.springframework" % "spring-beans" % "3.0.1.RELEASE" % "compile"
- val spring_context = "org.springframework" % "spring-context" % "3.0.1.RELEASE" % "compile"
+ val spring_beans = "org.springframework" % "spring-beans" % SPRING_VERSION % "compile"
+ val spring_context = "org.springframework" % "spring-context" % SPRING_VERSION % "compile"
// testing
- val camel_spring = "org.apache.camel" % "camel-spring" % "2.3.0" % "test"
- // enforce version 3.0.1.RELEASE otherwise version 2.5.6 is pulled via camel-spring
- val spring_tx = "org.springframework" % "spring-tx" % "3.0.1.RELEASE" % "test"
+ val camel_spring = "org.apache.camel" % "camel-spring" % CAMEL_VERSION % "test"
val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test"
val junit = "junit" % "junit" % "4.5" % "test"
}
@@ -339,9 +340,9 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
}
class AkkaSampleCamelProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) with CodeFellowPlugin {
- val spring_jms = "org.springframework" % "spring-jms" % "3.0.1.RELEASE" % "compile"
- val camel_jetty = "org.apache.camel" % "camel-jetty" % "2.3.0" % "compile"
- val camel_jms = "org.apache.camel" % "camel-jms" % "2.3.0" % "compile"
+ val spring_jms = "org.springframework" % "spring-jms" % SPRING_VERSION % "compile"
+ val camel_jetty = "org.apache.camel" % "camel-jetty" % CAMEL_VERSION % "compile" withSources()
+ val camel_jms = "org.apache.camel" % "camel-jms" % CAMEL_VERSION % "compile" withSources()
val activemq_core = "org.apache.activemq" % "activemq-core" % "5.3.2" % "compile"
}