re #320 Non-blocking in-out message exchanges with actors

Additionally:
- upgrade to Camel 2.4
- drop @consume support for actors
- some refactorings
This commit is contained in:
Martin Krasser 2010-07-13 07:11:42 +02:00
parent a265a1be48
commit ba75746c95
11 changed files with 252 additions and 149 deletions

View file

@ -4,7 +4,7 @@
package se.scalablesolutions.akka.camel
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.{ActorRef, Actor}
/**
* Mixed in by Actor implementations that consume message from Camel endpoints.
@ -12,9 +12,34 @@ import se.scalablesolutions.akka.actor.Actor
* @author Martin Krasser
*/
trait Consumer { self: Actor =>
/**
* Returns the Camel endpoint URI to consume messages from.
*/
def endpointUri: String
/**
* Determines whether two-way communications with this consumer actor should
* be done in blocking or non-blocking mode (default is non-blocking). One-way
* communications never block.
*/
def blocking = false
}
/**
* @author Martin Krasser
*/
private[camel] object Consumer {
/**
* Applies a function <code>f</code> to <code>actorRef</code> if <code>actorRef</code>
* references a consumer actor. A valid reference to a consumer actor is a local actor
* reference with a target actor that implements the <code>Consumer</code> trait. The
* target <code>Consumer</code> object is passed as argument to <code>f</code>. This
* method returns <code>None</code> if <code>actorRef</code> is not a valid reference
* to a consumer actor, <code>Some</code> result otherwise.
*/
def forConsumer[T](actorRef: ActorRef)(f: Consumer => T): Option[T] = {
if (!actorRef.actor.isInstanceOf[Consumer]) None
else if (actorRef.remoteAddress.isDefined) None
else Some(f(actorRef.actor.asInstanceOf[Consumer]))
}
}

View file

@ -24,7 +24,7 @@ private[camel] object ConsumerPublisher extends Logging {
* Creates a route to the registered consumer actor.
*/
def handleConsumerRegistered(event: ConsumerRegistered) {
CamelContextManager.context.addRoutes(new ConsumerActorRoute(event.uri, event.id, event.uuid))
CamelContextManager.context.addRoutes(new ConsumerActorRoute(event.uri, event.uuid, event.blocking))
log.info("published actor %s at endpoint %s" format (event.actorRef, event.uri))
}
@ -32,7 +32,7 @@ private[camel] object ConsumerPublisher extends Logging {
* Stops route to the already un-registered consumer actor.
*/
def handleConsumerUnregistered(event: ConsumerUnregistered) {
CamelContextManager.context.stopRoute(event.id)
CamelContextManager.context.stopRoute(event.uuid)
log.info("unpublished actor %s from endpoint %s" format (event.actorRef, event.uri))
}
@ -139,14 +139,13 @@ private[camel] abstract class ConsumerRoute(endpointUri: String, id: String) ext
* Defines the route to a consumer actor.
*
* @param endpointUri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> refers to Actor.uuid, <code>false</code> if
* <code>id</code> refers to Actor.getId.
* @param uuid actor uuid
* @param blocking true for blocking in-out exchanges, false otherwise
*
* @author Martin Krasser
*/
private[camel] class ConsumerActorRoute(endpointUri: String, id: String, uuid: Boolean) extends ConsumerRoute(endpointUri, id) {
protected override def targetUri = (if (uuid) "actor:uuid:%s" else "actor:id:%s") format id
private[camel] class ConsumerActorRoute(endpointUri: String, uuid: String, blocking: Boolean) extends ConsumerRoute(endpointUri, uuid) {
protected override def targetUri = "actor:uuid:%s?blocking=%s" format (uuid, blocking)
}
/**
@ -226,26 +225,23 @@ private[camel] sealed trait ConsumerEvent
*
* @param actorRef actor reference
* @param uri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> is the actor's uuid, <code>false</code> if
* <code>id</code> is the actor's id.
* @param uuid actor uuid
* @param blocking true for blocking in-out exchanges, false otherwise
*
* @author Martin Krasser
*/
private[camel] case class ConsumerRegistered(actorRef: ActorRef, uri: String, id: String, uuid: Boolean) extends ConsumerEvent
private[camel] case class ConsumerRegistered(actorRef: ActorRef, uri: String, uuid: String, blocking: Boolean) extends ConsumerEvent
/**
* Event indicating that a consumer actor has been unregistered from the actor registry.
*
* @param actorRef actor reference
* @param uri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> is the actor's uuid, <code>false</code> if
* <code>id</code> is the actor's id.
* @param uuid actor uuid
*
* @author Martin Krasser
*/
private[camel] case class ConsumerUnregistered(actorRef: ActorRef, uri: String, id: String, uuid: Boolean) extends ConsumerEvent
private[camel] case class ConsumerUnregistered(actorRef: ActorRef, uri: String, uuid: String) extends ConsumerEvent
/**
* Event indicating that an active object proxy has been created for a POJO. For each
@ -283,9 +279,10 @@ private[camel] object ConsumerRegistered {
* Optionally creates an ConsumerRegistered event message for a consumer actor or None if
* <code>actorRef</code> is not a consumer actor.
*/
def forConsumer(actorRef: ActorRef): Option[ConsumerRegistered] = actorRef match {
case ConsumerDescriptor(ref, uri, id, uuid) => Some(ConsumerRegistered(ref, uri, id, uuid))
case _ => None
def forConsumer(actorRef: ActorRef): Option[ConsumerRegistered] = {
Consumer.forConsumer[ConsumerRegistered](actorRef) {
target => ConsumerRegistered(actorRef, target.endpointUri, actorRef.uuid, target.blocking)
}
}
}
@ -297,9 +294,10 @@ private[camel] object ConsumerUnregistered {
* Optionally creates an ConsumerUnregistered event message for a consumer actor or None if
* <code>actorRef</code> is not a consumer actor.
*/
def forConsumer(actorRef: ActorRef): Option[ConsumerUnregistered] = actorRef match {
case ConsumerDescriptor(ref, uri, id, uuid) => Some(ConsumerUnregistered(ref, uri, id, uuid))
case _ => None
def forConsumer(actorRef: ActorRef): Option[ConsumerUnregistered] = {
Consumer.forConsumer[ConsumerUnregistered](actorRef) {
target => ConsumerUnregistered(actorRef, target.endpointUri, actorRef.uuid)
}
}
}
@ -333,12 +331,15 @@ private[camel] object ConsumerMethodRegistered {
* have any <code>@consume</code> annotated methods.
*/
def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodRegistered] = {
ConsumerMethod.forConsumer[ConsumerMethodRegistered](activeObject, init) {
ConsumerMethod.forConsumer(activeObject, init) {
m => ConsumerMethodRegistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
}
}
}
/**
* @author Martin Krasser
*/
private[camel] object ConsumerMethodUnregistered {
/**
* Creates a list of ConsumerMethodUnregistered event messages for an active object or an empty
@ -346,38 +347,8 @@ private[camel] object ConsumerMethodUnregistered {
* have any <code>@consume</code> annotated methods.
*/
def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodUnregistered] = {
ConsumerMethod.forConsumer[ConsumerMethodUnregistered](activeObject, init) {
ConsumerMethod.forConsumer(activeObject, init) {
m => ConsumerMethodUnregistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
}
}
}
/**
* Describes a consumer actor with elements that are relevant for publishing an actor at a
* Camel endpoint (or unpublishing an actor from an endpoint).
*
* @author Martin Krasser
*/
private[camel] object ConsumerDescriptor {
/**
* An extractor that optionally creates a 4-tuple from a consumer actor reference containing
* the actor reference itself, endpoint URI, identifier and a hint whether the identifier
* is the actor uuid or actor id. If <code>actorRef</code> doesn't reference a consumer actor,
* None is returned.
*/
def unapply(actorRef: ActorRef): Option[(ActorRef, String, String, Boolean)] =
unapplyConsumerInstance(actorRef) orElse unapplyConsumeAnnotated(actorRef)
private def unapplyConsumeAnnotated(actorRef: ActorRef): Option[(ActorRef, String, String, Boolean)] = {
val annotation = actorRef.actorClass.getAnnotation(classOf[consume])
if (annotation eq null) None
else if (actorRef.remoteAddress.isDefined) None
else Some((actorRef, annotation.value, actorRef.id, false))
}
private def unapplyConsumerInstance(actorRef: ActorRef): Option[(ActorRef, String, String, Boolean)] =
if (!actorRef.actor.isInstanceOf[Consumer]) None
else if (actorRef.remoteAddress.isDefined) None
else Some((actorRef, actorRef.actor.asInstanceOf[Consumer].endpointUri, actorRef.uuid, true))
}

View file

@ -4,15 +4,24 @@
package se.scalablesolutions.akka.camel.component
import java.lang.{RuntimeException, String}
import java.net.InetSocketAddress
import java.util.{Map => JavaMap}
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicReference
import org.apache.camel.{Exchange, Consumer, Processor}
import jsr166x.Deque
import org.apache.camel._
import org.apache.camel.impl.{DefaultProducer, DefaultEndpoint, DefaultComponent}
import se.scalablesolutions.akka.actor.{ActorRegistry, Actor, ActorRef}
import se.scalablesolutions.akka.camel.{Failure, CamelMessageConversion, Message}
import se.scalablesolutions.akka.dispatch.{CompletableFuture, MessageInvocation, MessageDispatcher}
import se.scalablesolutions.akka.stm.TransactionConfig
import scala.reflect.BeanProperty
import CamelMessageConversion.toExchangeAdapter
/**
* Camel component for sending messages to and receiving replies from actors.
@ -41,7 +50,7 @@ class ActorComponent extends DefaultComponent {
/**
* Camel endpoint for referencing an actor. The actor reference is given by the endpoint URI.
* An actor can be referenced by its <code>Actor.getId</code> or its <code>Actor.uuid</code>.
* An actor can be referenced by its <code>ActorRef.id</code> or its <code>ActorRef.uuid</code>.
* Supported endpoint URI formats are
* <code>actor:&lt;actorid&gt;</code>,
* <code>actor:id:&lt;actorid&gt;</code> and
@ -57,6 +66,12 @@ class ActorEndpoint(uri: String,
val id: Option[String],
val uuid: Option[String]) extends DefaultEndpoint(uri, comp) {
/**
* Blocking of client thread during two-way message exchanges with consumer actors. This is set
* via the <code>blocking=true|false</code> endpoint URI parameter. If omitted blocking is false.
*/
@BeanProperty var blocking: Boolean = false
/**
* @throws UnsupportedOperationException
*/
@ -75,48 +90,43 @@ class ActorEndpoint(uri: String,
}
/**
* Sends the in-message of an exchange to an actor. If the exchange pattern is out-capable,
* the producer waits for a reply (using the !! operator), otherwise the ! operator is used
* for sending the message.
* Sends the in-message of an exchange to an actor. If the exchange pattern is out-capable and
* <code>blocking</code> is enabled then the producer waits for a reply (using the !! operator),
* otherwise the ! operator is used for sending the message.
*
* @see se.scalablesolutions.akka.camel.component.ActorComponent
* @see se.scalablesolutions.akka.camel.component.ActorEndpoint
*
* @author Martin Krasser
*/
class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
import CamelMessageConversion.toExchangeAdapter
class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) with AsyncProcessor {
import ActorProducer._
implicit val sender = None
def process(exchange: Exchange) =
if (exchange.getPattern.isOutCapable) sendSync(exchange) else sendAsync(exchange)
/**
* Depending on the exchange pattern, this method either calls processInOut or
* processInOnly for interacting with an actor. This methods looks up the actor
* from the ActorRegistry according to this producer's endpoint URI.
*
* @param exchange represents the message exchange with the actor.
*/
def process(exchange: Exchange) {
val actor = target getOrElse (throw new ActorNotRegisteredException(ep.getEndpointUri))
if (exchange.getPattern.isOutCapable) processInOut(exchange, actor)
else processInOnly(exchange, actor)
def process(exchange: Exchange, callback: AsyncCallback): Boolean = {
(exchange.getPattern.isOutCapable, ep.blocking) match {
case (true, true) => {
sendSync(exchange)
callback.done(true)
true
}
case (true, false) => {
sendAsync(exchange, Some(AsyncCallbackAdapter(exchange, callback)))
false
}
case (false, _) => {
sendAsync(exchange)
callback.done(true)
true
}
}
}
/**
* Send the exchange in-message to the given actor using the ! operator. The message
* send to the actor is of type se.scalablesolutions.akka.camel.Message.
*/
protected def processInOnly(exchange: Exchange, actor: ActorRef): Unit =
actor ! exchange.toRequestMessage(Map(Message.MessageExchangeId -> exchange.getExchangeId))
/**
* Send the exchange in-message to the given actor using the !! operator. The exchange
* out-message is populated from the actor's reply message. The message sent to the
* actor is of type se.scalablesolutions.akka.camel.Message.
*/
protected def processInOut(exchange: Exchange, actor: ActorRef) {
val header = Map(Message.MessageExchangeId -> exchange.getExchangeId)
val result: Any = actor !! exchange.toRequestMessage(header)
private def sendSync(exchange: Exchange) = {
val actor = target
val result: Any = actor !! requestFor(exchange)
result match {
case Some(msg: Failure) => exchange.fromFailureMessage(msg)
@ -128,7 +138,13 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
}
}
private def target: Option[ActorRef] =
private def sendAsync(exchange: Exchange, sender: Option[ActorRef] = None) =
target.!(requestFor(exchange))(sender)
private def target =
targetOption getOrElse (throw new ActorNotRegisteredException(ep.getEndpointUri))
private def targetOption: Option[ActorRef] =
if (ep.id.isDefined) targetById(ep.id.get)
else targetByUuid(ep.uuid.get)
@ -141,6 +157,14 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
private def targetByUuid(uuid: String) = ActorRegistry.actorFor(uuid)
}
/**
* @author Martin Krasser
*/
private[camel] object ActorProducer {
def requestFor(exchange: Exchange) =
exchange.toRequestMessage(Map(Message.MessageExchangeId -> exchange.getExchangeId))
}
/**
* Thrown to indicate that an actor referenced by an endpoint URI cannot be
* found in the ActorRegistry.
@ -150,3 +174,92 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
class ActorNotRegisteredException(uri: String) extends RuntimeException {
override def getMessage = "%s not registered" format uri
}
/**
* @author Martin Krasser
*/
private[akka] object AsyncCallbackAdapter {
/**
* Creates and starts an <code>AsyncCallbackAdapter</code>.
*
* @param exchange message exchange to write results to.
* @param callback callback object to generate completion notifications.
*/
def apply(exchange: Exchange, callback: AsyncCallback) =
new AsyncCallbackAdapter(exchange, callback).start
}
/**
* Adapts an <code>AsyncCallback</code> to <code>ActorRef.!</code>. Used by other actors to reply
* asynchronously to Camel with <code>ActorRef.reply</code>.
* <p>
* <em>Please note</em> that this adapter can only be used locally at the moment which should not
* be a problem is most situations as Camel endpoints are only activated for local actor references,
* never for remote references.
*
* @author Martin Krasser
*/
private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCallback) extends ActorRef {
def start = {
_isRunning = true
this
}
def stop() = {
_isRunning = false
_isShutDown = true
}
/**
* Writes the reply <code>message</code> to <code>exchange</code> and uses <code>callback</code> to
* generate completion notifications.
*
* @param message reply message
* @param sender ignored
*/
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) = {
message match {
case msg: Failure => exchange.fromFailureMessage(msg)
case msg => exchange.fromResponseMessage(Message.canonicalize(msg))
}
callback.done(false)
}
def actorClass: Class[_ <: Actor] = unsupported
def actorClassName = unsupported
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
def dispatcher: MessageDispatcher = unsupported
def transactionConfig_=(config: TransactionConfig): Unit = unsupported
def transactionConfig: TransactionConfig = unsupported
def makeTransactionRequired: Unit = unsupported
def makeRemote(hostname: String, port: Int): Unit = unsupported
def makeRemote(address: InetSocketAddress): Unit = unsupported
def homeAddress_=(address: InetSocketAddress): Unit = unsupported
def remoteAddress: Option[InetSocketAddress] = unsupported
def link(actorRef: ActorRef): Unit = unsupported
def unlink(actorRef: ActorRef): Unit = unsupported
def startLink(actorRef: ActorRef): Unit = unsupported
def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int): Unit = unsupported
def spawn[T <: Actor : Manifest]: ActorRef = unsupported
def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = unsupported
def spawnLink[T <: Actor: Manifest]: ActorRef = unsupported
def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = unsupported
def shutdownLinkedActors: Unit = unsupported
def mailboxSize: Int = unsupported
def supervisor: Option[ActorRef] = unsupported
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](message: Any, timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]) = unsupported
protected[akka] def mailbox: Deque[MessageInvocation] = unsupported
protected[akka] def restart(reason: Throwable): Unit = unsupported
protected[akka] def restartLinkedActors(reason: Throwable): Unit = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported
protected[akka] def linkedActors: JavaMap[String, ActorRef] = unsupported
protected[akka] def linkedActorsAsList: List[ActorRef] = unsupported
protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported
protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit = unsupported
protected[akka] def registerSupervisorAsRemoteActor = unsupported
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = unsupported
protected[this] def actorInstance: AtomicReference[Actor] = unsupported
private def unsupported = throw new UnsupportedOperationException("Not supported for %s" format classOf[AsyncCallbackAdapter].getName)
}

View file

@ -2,6 +2,7 @@ package se.scalablesolutions.akka.camel
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.apache.camel.CamelExecutionException
import org.apache.camel.builder.RouteBuilder
import org.scalatest.{GivenWhenThen, BeforeAndAfterAll, FeatureSpec}
@ -62,24 +63,22 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
scenario("access to unregistered consumer actor via Camel direct-endpoint fails") {
val endpointUri = "direct:unpublish-test-1"
given("a consumer actor that has been stopped")
given("a consumer actor registered after CamelService startup")
assert(CamelContextManager.context.hasEndpoint(endpointUri) eq null)
var latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get
val consumer = actorOf(new TestConsumer(endpointUri)).start
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null)
when("the actor is stopped")
latch = (service.consumerPublisher !! SetExpectedMessageCount(1)).as[CountDownLatch].get
consumer.stop
assert(latch.await(5000, TimeUnit.MILLISECONDS))
// endpoint is still there but the route has been stopped
assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null)
when("a request is sent to this actor")
val response1 = CamelContextManager.template.requestBody(endpointUri, "msg1")
then("the direct-endpoint falls back to its default behaviour and returns the original message")
assert(response1 === "msg1")
then("the associated endpoint isn't accessible any more")
intercept[CamelExecutionException] {
CamelContextManager.template.requestBody(endpointUri, "msg1")
}
}
}
@ -128,24 +127,26 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
scenario("access to unregistered active object methof via Camel direct-endpoint fails") {
given("an active object that has been stopped")
given("an active object registered after CamelService startup")
var latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
val obj = ActiveObject.newInstance(classOf[PojoBase])
assert(latch.await(5000, TimeUnit.MILLISECONDS))
when("the active object is stopped")
latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
ActiveObject.stop(obj)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
when("requests are sent to published methods")
val response1 = CamelContextManager.template.requestBodyAndHeader("direct:m2base", "x", "test", "y")
val response2 = CamelContextManager.template.requestBodyAndHeader("direct:m3base", "x", "test", "y")
val response3 = CamelContextManager.template.requestBodyAndHeader("direct:m4base", "x", "test", "y")
then("the direct-endpoints fall back to their default behaviour and return the original message")
assert(response1 === "x")
assert(response2 === "x")
assert(response3 === "x")
then("the associated endpoints aren't accessible any more")
intercept[CamelExecutionException] {
CamelContextManager.template.requestBodyAndHeader("direct:m2base", "x", "test", "y")
}
intercept[CamelExecutionException] {
CamelContextManager.template.requestBodyAndHeader("direct:m3base", "x", "test", "y")
}
intercept[CamelExecutionException] {
CamelContextManager.template.requestBodyAndHeader("direct:m4base", "x", "test", "y")
}
}
}
}

View file

@ -5,17 +5,16 @@ import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.annotation.consume
object ConsumerRegisteredTest {
@consume("mock:test1")
class ConsumeAnnotatedActor extends Actor {
self.id = "test"
class ConsumerActor1 extends Actor with Consumer {
def endpointUri = "mock:test1"
protected def receive = null
}
class ConsumerActor extends Actor with Consumer {
class ConsumerActor2 extends Actor with Consumer {
def endpointUri = "mock:test2"
override def blocking = true
protected def receive = null
}
@ -27,21 +26,14 @@ object ConsumerRegisteredTest {
class ConsumerRegisteredTest extends JUnitSuite {
import ConsumerRegisteredTest._
@Test def shouldCreatePublishRequestList = {
val a = actorOf[ConsumeAnnotatedActor]
val as = List(a)
val events = for (a <- as; e <- ConsumerRegistered.forConsumer(a)) yield e
assert(events === List(ConsumerRegistered(a, "mock:test1", "test", false)))
@Test def shouldCreateSomeNonBlockingPublishRequest = {
val ca = actorOf[ConsumerActor1]
val event = ConsumerRegistered.forConsumer(ca)
assert(event === Some(ConsumerRegistered(ca, "mock:test1", ca.uuid, false)))
}
@Test def shouldCreateSomePublishRequestWithActorId = {
val a = actorOf[ConsumeAnnotatedActor]
val event = ConsumerRegistered.forConsumer(a)
assert(event === Some(ConsumerRegistered(a, "mock:test1", "test", false)))
}
@Test def shouldCreateSomePublishRequestWithActorUuid = {
val ca = actorOf[ConsumerActor]
@Test def shouldCreateSomeBlockingPublishRequest = {
val ca = actorOf[ConsumerActor2]
val event = ConsumerRegistered.forConsumer(ca)
assert(event === Some(ConsumerRegistered(ca, "mock:test2", ca.uuid, true)))
}

View file

@ -8,12 +8,6 @@ import org.scalatest.{GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, Feat
import se.scalablesolutions.akka.actor.{Actor, ActorRegistry}
import se.scalablesolutions.akka.actor.Actor._
object ProducerFeatureTest {
class TestProducer(uri: String) extends Actor with Producer {
def endpointUri = uri
}
}
class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach with GivenWhenThen {
import ProducerFeatureTest._
@ -121,6 +115,12 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
}
private def mockEndpoint = CamelContextManager.context.getEndpoint("mock:mock", classOf[MockEndpoint])
}
object ProducerFeatureTest {
class TestProducer(uri: String) extends Actor with Producer {
def endpointUri = uri
}
class TestRoute extends RouteBuilder {
def configure {
@ -137,4 +137,4 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
})
}
}
}
}

View file

@ -62,7 +62,7 @@ class PublishRequestorTest extends JUnitSuite {
requestor ! ActorRegistered(consumer)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher !! GetRetainedMessage) ===
Some(ConsumerRegistered(consumer, "mock:test", consumer.uuid, true)))
Some(ConsumerRegistered(consumer, "mock:test", consumer.uuid, false)))
}
@Test def shouldReceiveConsumerUnregisteredEvent = {
@ -70,7 +70,7 @@ class PublishRequestorTest extends JUnitSuite {
requestor ! ActorUnregistered(consumer)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher !! GetRetainedMessage) ===
Some(ConsumerUnregistered(consumer, "mock:test", consumer.uuid, true)))
Some(ConsumerUnregistered(consumer, "mock:test", consumer.uuid)))
}
}

View file

@ -55,7 +55,7 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with
scenario("two-way communication with timeout") {
val actor = actorOf[Tester3].start
intercept[RuntimeCamelException] {
template.requestBody("actor:uuid:%s" format actor.uuid, "Martin")
template.requestBody("actor:uuid:%s?blocking=true" format actor.uuid, "Martin")
}
}
}

View file

@ -10,7 +10,7 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
@Target({ElementType.METHOD})
public @interface consume {
public abstract String value();

View file

@ -1,6 +1,5 @@
package sample.camel
import se.scalablesolutions.akka.actor.annotation.consume
import se.scalablesolutions.akka.actor.{Actor, ActorRef, RemoteActor}
import se.scalablesolutions.akka.camel.{Producer, Message, Consumer}
import se.scalablesolutions.akka.util.Logging
@ -42,8 +41,9 @@ class Consumer1 extends Actor with Consumer with Logging {
}
}
@consume("jetty:http://0.0.0.0:8877/camel/default")
class Consumer2 extends Actor {
def endpointUri = "jetty:http://0.0.0.0:8877/camel/default"
def receive = {
case msg: Message => self.reply("Hello %s" format msg.bodyAs[String])
}

View file

@ -18,6 +18,8 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
// project versions
val JERSEY_VERSION = "1.2"
val ATMO_VERSION = "0.6"
val CAMEL_VERSION = "2.4.0"
val SPRING_VERSION = "3.0.3.RELEASE"
val CASSANDRA_VERSION = "0.6.1"
val LIFT_VERSION = "2.0-scala280-SNAPSHOT"
val SCALATEST_VERSION = "1.2-for-scala-2.8.0.RC3-SNAPSHOT"
@ -69,6 +71,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots)
def codehausSnapshotRepo = "Codehaus Snapshots" at "http://snapshots.repository.codehaus.org"
val multiverseModuleConfig = ModuleConfiguration("org.multiverse", codehausSnapshotRepo)
val apacheSnapshotRepo = "Camel 2.4 Staging" at "https://repository.apache.org/content/repositories/orgapachecamel-053/"
// ------------------------------------------------------------
// project defintions
@ -239,7 +242,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
}
class AkkaCamelProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with CodeFellowPlugin {
val camel_core = "org.apache.camel" % "camel-core" % "2.3.0" % "compile"
val camel_core = "org.apache.camel" % "camel-core" % CAMEL_VERSION % "compile" withSources()
}
class AkkaPersistenceCommonProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
@ -285,13 +288,11 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
class AkkaKernelProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath)
class AkkaSpringProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with CodeFellowPlugin {
val spring_beans = "org.springframework" % "spring-beans" % "3.0.1.RELEASE" % "compile"
val spring_context = "org.springframework" % "spring-context" % "3.0.1.RELEASE" % "compile"
val spring_beans = "org.springframework" % "spring-beans" % SPRING_VERSION % "compile"
val spring_context = "org.springframework" % "spring-context" % SPRING_VERSION % "compile"
// testing
val camel_spring = "org.apache.camel" % "camel-spring" % "2.3.0" % "test"
// enforce version 3.0.1.RELEASE otherwise version 2.5.6 is pulled via camel-spring
val spring_tx = "org.springframework" % "spring-tx" % "3.0.1.RELEASE" % "test"
val camel_spring = "org.apache.camel" % "camel-spring" % CAMEL_VERSION % "test"
val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test"
val junit = "junit" % "junit" % "4.5" % "test"
}
@ -339,9 +340,9 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
}
class AkkaSampleCamelProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) with CodeFellowPlugin {
val spring_jms = "org.springframework" % "spring-jms" % "3.0.1.RELEASE" % "compile"
val camel_jetty = "org.apache.camel" % "camel-jetty" % "2.3.0" % "compile"
val camel_jms = "org.apache.camel" % "camel-jms" % "2.3.0" % "compile"
val spring_jms = "org.springframework" % "spring-jms" % SPRING_VERSION % "compile"
val camel_jetty = "org.apache.camel" % "camel-jetty" % CAMEL_VERSION % "compile" withSources()
val camel_jms = "org.apache.camel" % "camel-jms" % CAMEL_VERSION % "compile" withSources()
val activemq_core = "org.apache.activemq" % "activemq-core" % "5.3.2" % "compile"
}