diff --git a/akka-camel/src/main/resources/META-INF/services/org/apache/camel/component/actor b/akka-camel/src/main/resources/META-INF/services/org/apache/camel/component/actor new file mode 100644 index 0000000000..a2141db8a9 --- /dev/null +++ b/akka-camel/src/main/resources/META-INF/services/org/apache/camel/component/actor @@ -0,0 +1 @@ +class=se.scalablesolutions.akka.camel.component.ActorComponent \ No newline at end of file diff --git a/akka-camel/src/main/scala/CamelContextLifecycle.scala b/akka-camel/src/main/scala/CamelContextLifecycle.scala new file mode 100644 index 0000000000..b9a696207c --- /dev/null +++ b/akka-camel/src/main/scala/CamelContextLifecycle.scala @@ -0,0 +1,95 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.camel + +import org.apache.camel.{ProducerTemplate, CamelContext} +import org.apache.camel.impl.DefaultCamelContext + +import se.scalablesolutions.akka.util.Logging + +/** + * Defines the lifecycle of a CamelContext. Allowed state transitions are + * init -> start -> stop -> init -> ... etc. + * + * @author Martin Krasser + */ +trait CamelContextLifecycle extends Logging { + // TODO: enforce correct state transitions + // valid: init -> start -> stop -> init ... + + private var _context: CamelContext = _ + private var _template: ProducerTemplate = _ + + private var _initialized = false + private var _started = false + + /** + * Returns the managed CamelContext. + */ + protected def context: CamelContext = _context + + /** + * Returns the managed ProducerTemplate. + */ + protected def template: ProducerTemplate = _template + + /** + * Sets the managed CamelContext. + */ + protected def context_= (context: CamelContext) { _context = context } + + /** + * Sets the managed ProducerTemplate. + */ + protected def template_= (template: ProducerTemplate) { _template = template } + + def initialized = _initialized + def started = _started + + /** + * Starts the CamelContext and ProducerTemplate. + */ + def start = { + context.start + template.start + _started = true + log.info("Camel context started") + } + + /** + * Stops the CamelContext and ProducerTemplate. + */ + def stop = { + template.stop + context.stop + _initialized = false + _started = false + log.info("Camel context stopped") + } + + /** + * Initializes this lifecycle object with the a DefaultCamelContext. + */ + def init: Unit = init(new DefaultCamelContext) + + /** + * Initializes this lifecycle object with the given CamelContext. + */ + def init(context: CamelContext) { + this.context = context + this.template = context.createProducerTemplate + _initialized = true + log.info("Camel context initialized") + } +} + +/** + * Makes a global CamelContext and ProducerTemplate accessible to applications. The lifecycle + * of these objects is managed by se.scalablesolutions.akka.camel.service.CamelService. + */ +object CamelContextManager extends CamelContextLifecycle { + override def context: CamelContext = super.context + override def template: ProducerTemplate = super.template +} \ No newline at end of file diff --git a/akka-camel/src/main/scala/Consumer.scala b/akka-camel/src/main/scala/Consumer.scala new file mode 100644 index 0000000000..27ec98b25d --- /dev/null +++ b/akka-camel/src/main/scala/Consumer.scala @@ -0,0 +1,20 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.camel + +import se.scalablesolutions.akka.actor.Actor + +/** + * Mixed in by Actor implementations that consume message from Camel endpoints. + * + * @author Martin Krasser + */ +trait Consumer { self: Actor => + + /** + * Returns the Camel endpoint URI to consume messages from. + */ + def endpointUri: String +} \ No newline at end of file diff --git a/akka-camel/src/main/scala/Message.scala b/akka-camel/src/main/scala/Message.scala new file mode 100644 index 0000000000..8e0156c669 --- /dev/null +++ b/akka-camel/src/main/scala/Message.scala @@ -0,0 +1,249 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.camel + +import org.apache.camel.{Exchange, Message => CamelMessage} +import org.apache.camel.util.ExchangeHelper + +import scala.collection.jcl.{Map => MapWrapper} + +/** + * An immutable representation of a Camel message. Actor classes that mix in + * se.scalablesolutions.akka.camel.Producer or + * se.scalablesolutions.akka.camel.Consumer use this message type for communication. + * + * @author Martin Krasser + */ +case class Message(val body: Any, val headers: Map[String, Any]) { + /** + * Creates a message with a body and an empty header map. + */ + def this(body: Any) = this(body, Map.empty) + + /** + * Returns the body of the message converted to the type given by the clazz + * argument. Conversion is done using Camel's type converter. The type converter is obtained + * from the CamelContext managed by CamelContextManager. Applications have to ensure proper + * initialization of CamelContextManager. + * + * @see CamelContextManager. + */ + def bodyAs[T](clazz: Class[T]): T = + CamelContextManager.context.getTypeConverter.mandatoryConvertTo[T](clazz, body) + + /** + * Returns those headers from this message whose name is contained in names. + */ + def headers(names: Set[String]): Map[String, Any] = headers.filter(names contains _._1) + + /** + * Creates a Message with a new body using a transformer function. + */ + def transformBody[A](transformer: A => Any): Message = setBody(transformer(body.asInstanceOf[A])) + + /** + * Creates a Message with a new body converted to type clazz. + * + * @see Message#bodyAs(Class) + */ + def setBodyAs[T](clazz: Class[T]): Message = setBody(bodyAs(clazz)) + + /** + * Creates a Message with a new body. + */ + def setBody(body: Any) = new Message(body, this.headers) + + /** + * Creates a new Message with new headers. + */ + def setHeaders(headers: Map[String, Any]) = new Message(this.body, headers) + + /** + * Creates a new Message with the headers argument added to the existing headers. + */ + def addHeaders(headers: Map[String, Any]) = new Message(this.body, this.headers ++ headers) + + /** + * Creates a new Message with the header argument added to the existing headers. + */ + def addHeader(header: (String, Any)) = new Message(this.body, this.headers + header) + + /** + * Creates a new Message where the header with name headerName is removed from + * the existing headers. + */ + def removeHeader(headerName: String) = new Message(this.body, this.headers - headerName) +} + +/** + * Companion object of Message class. + * + * @author Martin Krasser + */ +object Message { + + /** + * Message header to correlate request with response messages. Applications that send + * messages to a Producer actor may want to set this header on the request message + * so that it can be correlated with an asynchronous response. Messages send to Consumer + * actors have this header already set. + */ + val MessageExchangeId = "MessageExchangeId".intern + + /** + * Creates a new Message with body as message body and an empty header map. + */ + def apply(body: Any) = new Message(body) + + /** + * Creates a canonical form of the given message msg. If msg of type + * Message then msg is returned, otherwise msg is set as body of a + * newly created Message object. + */ + def canonicalize(msg: Any) = msg match { + case mobj: Message => mobj + case body => new Message(body) + } +} + +/** + * An immutable representation of a failed Camel exchange. It contains the failure cause + * obtained from Exchange.getException and the headers from either the Exchange.getIn + * message or Exchange.getOut message, depending on the exchange pattern. + * + * @author Martin Krasser + */ +case class Failure(val cause: Exception, val headers: Map[String, Any]) + +/** + * Adapter for converting an org.apache.camel.Exchange to and from Message and Failure objects. + * + * @author Martin Krasser + */ +class CamelExchangeAdapter(exchange: Exchange) { + + import CamelMessageConversion.toMessageAdapter + + /** + * Sets Exchange.getIn from the given Message object. + */ + def fromRequestMessage(msg: Message): Exchange = { requestMessage.fromMessage(msg); exchange } + + /** + * Depending on the exchange pattern, sets Exchange.getIn or Exchange.getOut from the given + * Message object. If the exchange is out-capable then the Exchange.getOut is set, otherwise + * Exchange.getIn. + */ + def fromResponseMessage(msg: Message): Exchange = { responseMessage.fromMessage(msg); exchange } + + /** + * Sets Exchange.getException from the given Failure message. Headers of the Failure message + * are ignored. + */ + def fromFailureMessage(msg: Failure): Exchange = { exchange.setException(msg.cause); exchange } + + /** + * Creates a Message object from Exchange.getIn. + */ + def toRequestMessage: Message = toRequestMessage(Map.empty) + + /** + * Depending on the exchange pattern, creates a Message object from Exchange.getIn or Exchange.getOut. + * If the exchange is out-capable then the Exchange.getOut is set, otherwise Exchange.getIn. + */ + def toResponseMessage: Message = toResponseMessage(Map.empty) + + /** + * Creates a Failure object from the adapted Exchange. + * + * @see Failure + */ + def toFailureMessage: Failure = toFailureMessage(Map.empty) + + /** + * Creates a Message object from Exchange.getIn. + * + * @param headers additional headers to set on the created Message in addition to those + * in the Camel message. + */ + def toRequestMessage(headers: Map[String, Any]): Message = requestMessage.toMessage(headers) + + /** + * Depending on the exchange pattern, creates a Message object from Exchange.getIn or Exchange.getOut. + * If the exchange is out-capable then the Exchange.getOut is set, otherwise Exchange.getIn. + * + * @param headers additional headers to set on the created Message in addition to those + * in the Camel message. + */ + def toResponseMessage(headers: Map[String, Any]): Message = responseMessage.toMessage(headers) + + /** + * Creates a Failure object from the adapted Exchange. + * + * @param headers additional headers to set on the created Message in addition to those + * in the Camel message. + * + * @see Failure + */ + def toFailureMessage(headers: Map[String, Any]): Failure = + Failure(exchange.getException, headers ++ responseMessage.toMessage.headers) + + private def requestMessage = exchange.getIn + + private def responseMessage = ExchangeHelper.getResultMessage(exchange) + +} + +/** + * Adapter for converting an org.apache.camel.Message to and from Message objects. + * + * @author Martin Krasser + */ +class CamelMessageAdapter(val cm: CamelMessage) { + /** + * Set the adapted Camel message from the given Message object. + */ + def fromMessage(m: Message): CamelMessage = { + cm.setBody(m.body) + for (h <- m.headers) cm.getHeaders.put(h._1, h._2.asInstanceOf[AnyRef]) + cm + } + + /** + * Creates a new Message object from the adapted Camel message. + */ + def toMessage: Message = toMessage(Map.empty) + + /** + * Creates a new Message object from the adapted Camel message. + * + * @param headers additional headers to set on the created Message in addition to those + * in the Camel message. + */ + def toMessage(headers: Map[String, Any]): Message = Message(cm.getBody, cmHeaders(headers, cm)) + + private def cmHeaders(headers: Map[String, Any], cm: CamelMessage) = + headers ++ MapWrapper[String, AnyRef](cm.getHeaders).elements +} + +/** + * Defines conversion methods to CamelExchangeAdapter and CamelMessageAdapter. + * Imported by applications + * that implicitly want to use conversion methods of CamelExchangeAdapter and CamelMessageAdapter. + */ +object CamelMessageConversion { + + /** + * Creates an CamelExchangeAdapter for the given Camel exchange. + */ + implicit def toExchangeAdapter(ce: Exchange): CamelExchangeAdapter = + new CamelExchangeAdapter(ce) + + /** + * Creates an CamelMessageAdapter for the given Camel message. + */ + implicit def toMessageAdapter(cm: CamelMessage): CamelMessageAdapter = + new CamelMessageAdapter(cm) +} \ No newline at end of file diff --git a/akka-camel/src/main/scala/Producer.scala b/akka-camel/src/main/scala/Producer.scala new file mode 100644 index 0000000000..43e9b8b10e --- /dev/null +++ b/akka-camel/src/main/scala/Producer.scala @@ -0,0 +1,192 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.camel + +import CamelMessageConversion.toExchangeAdapter + +import org.apache.camel.{Processor, ExchangePattern, Exchange, ProducerTemplate} +import org.apache.camel.impl.DefaultExchange +import org.apache.camel.spi.Synchronization + +import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.dispatch.CompletableFuture +import se.scalablesolutions.akka.util.Logging + +/** + * Mixed in by Actor implementations that produce messages to Camel endpoints. + * + * @author Martin Krasser + */ +trait Producer { self: Actor => + + private val headersToCopyDefault = Set(Message.MessageExchangeId) + + /** + * If set to true (default), communication with the Camel endpoint is done via the Camel + * Async API. Camel then processes the + * message in a separate thread. If set to false, the actor thread is blocked until Camel + * has finished processing the produced message. + */ + def async: Boolean = true + + /** + * If set to false (default), this producer expects a response message from the Camel endpoint. + * If set to true, this producer communicates with the Camel endpoint with an in-only message + * exchange pattern (fire and forget). + */ + def oneway: Boolean = false + + /** + * Returns the Camel endpoint URI to produce messages to. + */ + def endpointUri: String + + /** + * Returns the names of message headers to copy from a request message to a response message. + * By default only the Message.MessageExchangeId is copied. Applications may override this to + * define an application-specific set of message headers to copy. + */ + def headersToCopy: Set[String] = headersToCopyDefault + + /** + * Returns the producer template from the CamelContextManager. Applications either have to ensure + * proper initialization of CamelContextManager or override this method. + * + * @see CamelContextManager. + */ + protected def template: ProducerTemplate = CamelContextManager.template + + /** + * Initiates a one-way (in-only) message exchange to the Camel endpoint given by + * endpointUri. This method blocks until Camel finishes processing + * the message exchange. + * + * @param msg: the message to produce. The message is converted to its canonical + * representation via Message.canonicalize. + */ + protected def produceOneway(msg: Any): Unit = + template.send(endpointUri, createInOnlyExchange.fromRequestMessage(Message.canonicalize(msg))) + + /** + * Initiates a one-way (in-only) message exchange to the Camel endpoint given by + * endpointUri. This method triggers asynchronous processing of the + * message exchange by Camel. + * + * @param msg: the message to produce. The message is converted to its canonical + * representation via Message.canonicalize. + */ + protected def produceOnewayAsync(msg: Any): Unit = + template.asyncSend( + endpointUri, createInOnlyExchange.fromRequestMessage(Message.canonicalize(msg))) + + /** + * Initiates a two-way (in-out) message exchange to the Camel endpoint given by + * endpointUri. This method blocks until Camel finishes processing + * the message exchange. + * + * @param msg: the message to produce. The message is converted to its canonical + * representation via Message.canonicalize. + * @return either a response Message or a Failure object. + */ + protected def produce(msg: Any): Any = { + val cmsg = Message.canonicalize(msg) + val requestProcessor = new Processor() { + def process(exchange: Exchange) = exchange.fromRequestMessage(cmsg) + } + val result = template.request(endpointUri, requestProcessor) + if (result.isFailed) result.toFailureMessage(cmsg.headers(headersToCopy)) + else result.toResponseMessage(cmsg.headers(headersToCopy)) + } + + /** + * Initiates a two-way (in-out) message exchange to the Camel endpoint given by + * endpointUri. This method triggers asynchronous processing of the + * message exchange by Camel. The response message is returned asynchronously to + * the original sender (or sender future). + * + * @param msg: the message to produce. The message is converted to its canonical + * representation via Message.canonicalize. + * @return either a response Message or a Failure object. + * @see ProducerResponseSender + */ + protected def produceAsync(msg: Any): Unit = { + val cmsg = Message.canonicalize(msg) + val sync = new ProducerResponseSender( + cmsg.headers(headersToCopy), this.sender, this.senderFuture, this) + template.asyncCallback(endpointUri, createInOutExchange.fromRequestMessage(cmsg), sync) + } + + /** + * Default implementation for Actor.receive. Implementors may choose to + * def receive = produce. This partial function calls one of + * the protected produce methods depending on the return values of + * oneway and async. + */ + protected def produce: PartialFunction[Any, Unit] = { + case msg => { + if ( oneway && !async) produceOneway(msg) + else if ( oneway && async) produceOnewayAsync(msg) + else if (!oneway && !async) reply(produce(msg)) + else /*(!oneway && async)*/ produceAsync(msg) + } + } + + /** + * Creates a new in-only Exchange. + */ + protected def createInOnlyExchange: Exchange = createExchange(ExchangePattern.InOnly) + + /** + * Creates a new in-out Exchange. + */ + protected def createInOutExchange: Exchange = createExchange(ExchangePattern.InOut) + + /** + * Creates a new Exchange with given pattern from the CamelContext managed by + * CamelContextManager. Applications either have to ensure proper initialization + * of CamelContextManager or override this method. + * + * @see CamelContextManager. + */ + protected def createExchange(pattern: ExchangePattern): Exchange = + new DefaultExchange(CamelContextManager.context, pattern) +} + +/** + * Synchronization object that sends responses asynchronously to initial senders. This + * class is used by Producer for asynchronous two-way messaging with a Camel endpoint. + * + * @author Martin Krasser + */ +class ProducerResponseSender( + headers: Map[String, Any], + sender: Option[Actor], + senderFuture: Option[CompletableFuture], + producer: Actor) extends Synchronization with Logging { + + implicit val producerActor = Some(producer) // the response sender + + /** + * Replies a Failure message, created from the given exchange, to sender (or + * senderFuture if applicable). + */ + def onFailure(exchange: Exchange) = reply(exchange.toFailureMessage(headers)) + + /** + * Replies a response Message, created from the given exchange, to sender (or + * senderFuture if applicable). + */ + def onComplete(exchange: Exchange) = reply(exchange.toResponseMessage(headers)) + + private def reply(message: Any) = { + sender match { + case Some(actor) => actor ! message + case None => senderFuture match { + case Some(future) => future.completeWithResult(message) + case None => log.warning("No destination for sending response") + } + } + } +} diff --git a/akka-camel/src/main/scala/component/ActorComponent.scala b/akka-camel/src/main/scala/component/ActorComponent.scala new file mode 100644 index 0000000000..763f9dd017 --- /dev/null +++ b/akka-camel/src/main/scala/component/ActorComponent.scala @@ -0,0 +1,152 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.camel.component + +import java.lang.{RuntimeException, String} +import java.util.{Map => JavaMap} +import java.util.concurrent.TimeoutException + +import org.apache.camel.{Exchange, Consumer, Processor} +import org.apache.camel.impl.{DefaultProducer, DefaultEndpoint, DefaultComponent} + +import se.scalablesolutions.akka.actor.{ActorRegistry, Actor} +import se.scalablesolutions.akka.camel.{Failure, CamelMessageConversion, Message} + +/** + * Camel component for sending messages to and receiving replies from actors. + * + * @see se.scalablesolutions.akka.camel.component.ActorEndpoint + * @see se.scalablesolutions.akka.camel.component.ActorProducer + * + * @author Martin Krasser + */ +class ActorComponent extends DefaultComponent { + def createEndpoint(uri: String, remaining: String, parameters: JavaMap[String, Object]): ActorEndpoint = { + val idAndUuid = idAndUuidPair(remaining) + new ActorEndpoint(uri, this, idAndUuid._1, idAndUuid._2) + } + + private def idAndUuidPair(remaining: String): Tuple2[Option[String], Option[String]] = { + remaining split ":" toList match { + case id :: Nil => (Some(id), None) + case "id" :: id :: Nil => (Some(id), None) + case "uuid" :: uuid :: Nil => (None, Some(uuid)) + case _ => throw new IllegalArgumentException( + "invalid path format: %s - should be or id: or uuid:" format remaining) + } + } +} + +/** + * Camel endpoint for referencing an actor. The actor reference is given by the endpoint URI. + * An actor can be referenced by its Actor.getId or its Actor.uuid. + * Supported endpoint URI formats are + * actor:<actorid>, + * actor:id:<actorid> and + * actor:uuid:<actoruuid>. + * + * @see se.scalablesolutions.akka.camel.component.ActorComponent + * @see se.scalablesolutions.akka.camel.component.ActorProducer + + * @author Martin Krasser + */ +class ActorEndpoint(uri: String, + comp: ActorComponent, + val id: Option[String], + val uuid: Option[String]) extends DefaultEndpoint(uri, comp) { + + /** + * @throws UnsupportedOperationException + */ + def createConsumer(processor: Processor): Consumer = + throw new UnsupportedOperationException("actor consumer not supported yet") + + /** + * Creates a new ActorProducer instance initialized with this endpoint. + */ + def createProducer: ActorProducer = new ActorProducer(this) + + /** + * Returns true. + */ + def isSingleton: Boolean = true +} + +/** + * Sends the in-message of an exchange to an actor. If the exchange pattern is out-capable, + * the producer waits for a reply (using the !! operator), otherwise the ! operator is used + * for sending the message. + * + * @see se.scalablesolutions.akka.camel.component.ActorComponent + * @see se.scalablesolutions.akka.camel.component.ActorEndpoint + * + * @author Martin Krasser + */ +class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) { + import CamelMessageConversion.toExchangeAdapter + + implicit val sender = None + + /** + * Depending on the exchange pattern, this method either calls processInOut or + * processInOnly for interacting with an actor. This methods looks up the actor + * from the ActorRegistry according to this producer's endpoint URI. + * + * @param exchange represents the message exchange with the actor. + */ + def process(exchange: Exchange) { + val actor = target getOrElse (throw new ActorNotRegisteredException(ep.getEndpointUri)) + if (exchange.getPattern.isOutCapable) processInOut(exchange, actor) + else processInOnly(exchange, actor) + } + + /** + * Send the exchange in-message to the given actor using the ! operator. The message + * send to the actor is of type se.scalablesolutions.akka.camel.Message. + */ + protected def processInOnly(exchange: Exchange, actor: Actor): Unit = + actor ! exchange.toRequestMessage(Map(Message.MessageExchangeId -> exchange.getExchangeId)) + + /** + * Send the exchange in-message to the given actor using the !! operator. The exchange + * out-message is populated from the actor's reply message. The message sent to the + * actor is of type se.scalablesolutions.akka.camel.Message. + */ + protected def processInOut(exchange: Exchange, actor: Actor) { + val header = Map(Message.MessageExchangeId -> exchange.getExchangeId) + val result: Any = actor !! exchange.toRequestMessage(header) + + result match { + case Some(msg: Failure) => exchange.fromFailureMessage(msg) + case Some(msg) => exchange.fromResponseMessage(Message.canonicalize(msg)) + case None => { + throw new TimeoutException("timeout (%d ms) while waiting response from %s" + format (actor.timeout, ep.getEndpointUri)) + } + } + } + + private def target: Option[Actor] = + if (ep.id.isDefined) targetById(ep.id.get) + else targetByUuid(ep.uuid.get) + + private def targetById(id: String) = ActorRegistry.actorsFor(id) match { + case Nil => None + case actor :: Nil => Some(actor) + case actors => Some(actors.first) + } + + private def targetByUuid(uuid: String) = ActorRegistry.actorFor(uuid) +} + +/** + * Thrown to indicate that an actor referenced by an endpoint URI cannot be + * found in the ActorRegistry. + * + * @author Martin Krasser + */ +class ActorNotRegisteredException(uri: String) extends RuntimeException { + override def getMessage = "%s not registered" format uri +} \ No newline at end of file diff --git a/akka-camel/src/main/scala/service/CamelService.scala b/akka-camel/src/main/scala/service/CamelService.scala new file mode 100644 index 0000000000..86b4f2dc23 --- /dev/null +++ b/akka-camel/src/main/scala/service/CamelService.scala @@ -0,0 +1,89 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.camel.service + +import se.scalablesolutions.akka.actor.ActorRegistry +import se.scalablesolutions.akka.camel.CamelContextManager +import se.scalablesolutions.akka.util.{Bootable, Logging} + +/** + * Used by applications (and the Kernel) to publish consumer actors via Camel + * endpoints and to manage the life cycle of a a global CamelContext which can + * be accessed via se.scalablesolutions.akka.camel.CamelContextManager. + * + * @author Martin Krasser + */ +trait CamelService extends Bootable with Logging { + + import se.scalablesolutions.akka.actor.Actor.Sender.Self + import CamelContextManager._ + + private[camel] val consumerPublisher = new ConsumerPublisher + private[camel] val publishRequestor = new PublishRequestor(consumerPublisher) + + /** + * Starts the CamelService. Any started actor that is a consumer actor will be (asynchronously) + * published as Camel endpoint. Consumer actors that are started after this method returned will + * be published as well. Actor publishing is done asynchronously. + */ + abstract override def onLoad = { + super.onLoad + + // Only init and start if not already done by application + if (!initialized) init + if (!started) start + + // Camel should cache input streams + context.setStreamCaching(true) + + // start actor that exposes consumer actors via Camel endpoints + consumerPublisher.start + + // add listener for actor registration events + ActorRegistry.addRegistrationListener(publishRequestor.start) + + // publish already registered consumer actors + for (publish <- Publish.forConsumers(ActorRegistry.actors)) consumerPublisher ! publish + } + + /** + * Stops the CamelService. + */ + abstract override def onUnload = { + ActorRegistry.removeRegistrationListener(publishRequestor) + publishRequestor.stop + consumerPublisher.stop + stop + super.onUnload + } + + /** + * Starts the CamelService. + * + * @see onLoad + */ + def load = onLoad + + /** + * Stops the CamelService. + * + * @see onUnload + */ + def unload = onUnload +} + +/** + * CamelService companion object used by standalone applications to create their own + * CamelService instance. + * + * @author Martin Krasser + */ +object CamelService { + + /** + * Creates a new CamelService instance. + */ + def newInstance: CamelService = new CamelService {} +} diff --git a/akka-camel/src/main/scala/service/ConsumerPublisher.scala b/akka-camel/src/main/scala/service/ConsumerPublisher.scala new file mode 100644 index 0000000000..dee30882f1 --- /dev/null +++ b/akka-camel/src/main/scala/service/ConsumerPublisher.scala @@ -0,0 +1,135 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.camel.service + +import java.io.InputStream +import java.util.concurrent.CountDownLatch + +import org.apache.camel.builder.RouteBuilder + +import se.scalablesolutions.akka.actor.{ActorUnregistered, ActorRegistered, Actor} +import se.scalablesolutions.akka.annotation.consume +import se.scalablesolutions.akka.camel.{Consumer, CamelContextManager} +import se.scalablesolutions.akka.util.Logging + +/** + * Actor that publishes consumer actors as Camel endpoints at the CamelContext managed + * by se.scalablesolutions.akka.camel.CamelContextManager. It accepts messages of type + * se.scalablesolutions.akka.camel.service.Publish. + * + * @author Martin Krasser + */ +class ConsumerPublisher extends Actor with Logging { + @volatile private var latch = new CountDownLatch(0) + + /** + * Adds a route to the actor identified by a Publish message to the global CamelContext. + */ + protected def receive = { + case p: Publish => publish(new ConsumerRoute(p.endpointUri, p.id, p.uuid)) + case _ => { /* ignore */} + } + + /** + * Sets the number of expected Publish messages received by this actor. Used for testing + * only. + */ + private[camel] def expectPublishCount(count: Int): Unit = latch = new CountDownLatch(count) + + /** + * Waits for the number of expected Publish messages to arrive. Used for testing only. + */ + private[camel] def awaitPublish = latch.await + + private def publish(route: ConsumerRoute) { + CamelContextManager.context.addRoutes(route) + log.info("published actor via endpoint %s" format route.endpointUri) + latch.countDown // needed for testing only. + } +} + +/** + * Defines the route to a consumer actor. + * + * @param endpointUri endpoint URI of the consumer actor + * @param id actor identifier + * @param uuid true if id refers to Actor.uuid, false if + * id refers to Acotr.getId. + * + * @author Martin Krasser + */ +class ConsumerRoute(val endpointUri: String, id: String, uuid: Boolean) extends RouteBuilder { + // TODO: make conversions configurable + private val bodyConversions = Map( + "file" -> classOf[InputStream] + ) + + def configure = { + val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..." + bodyConversions.get(schema) match { + case Some(clazz) => from(endpointUri).convertBodyTo(clazz).to(actorUri) + case None => from(endpointUri).to(actorUri) + } + } + + private def actorUri = (if (uuid) "actor:uuid:%s" else "actor:id:%s") format id +} + +/** + * A registration listener that publishes consumer actors (and ignores other actors). + * + * @author Martin Krasser + */ +class PublishRequestor(consumerPublisher: Actor) extends Actor { + protected def receive = { + case ActorUnregistered(actor) => { /* ignore */ } + case ActorRegistered(actor) => Publish.forConsumer(actor) match { + case Some(publish) => consumerPublisher ! publish + case None => { /* ignore */ } + } + } +} + +/** + * Request message for publishing a consumer actor. + * + * @param endpointUri endpoint URI of the consumer actor + * @param id actor identifier + * @param uuid true if id refers to Actor.uuid, false if + * id refers to Acotr.getId. + * + * @author Martin Krasser + */ +case class Publish(endpointUri: String, id: String, uuid: Boolean) + +/** + * @author Martin Krasser + */ +object Publish { + + /** + * Creates a list of Publish request messages for all consumer actors in the actors + * list. + */ + def forConsumers(actors: List[Actor]): List[Publish] = + for (actor <- actors; pub = forConsumer(actor); if pub.isDefined) yield pub.get + + /** + * Creates a Publish request message if actor is a consumer actor. + */ + def forConsumer(actor: Actor): Option[Publish] = + forConsumeAnnotated(actor) orElse forConsumerType(actor) + + private def forConsumeAnnotated(actor: Actor): Option[Publish] = { + val annotation = actor.getClass.getAnnotation(classOf[consume]) + if (annotation eq null) None + else if (actor._remoteAddress.isDefined) None // do not publish proxies + else Some(Publish(annotation.value, actor.getId, false)) + } + + private def forConsumerType(actor: Actor): Option[Publish] = + if (!actor.isInstanceOf[Consumer]) None + else if (actor._remoteAddress.isDefined) None + else Some(Publish(actor.asInstanceOf[Consumer].endpointUri, actor.uuid, true)) +} diff --git a/akka-camel/src/test/scala/MessageTest.scala b/akka-camel/src/test/scala/MessageTest.scala new file mode 100644 index 0000000000..d519dbafa7 --- /dev/null +++ b/akka-camel/src/test/scala/MessageTest.scala @@ -0,0 +1,79 @@ +package se.scalablesolutions.akka.camel + +import java.io.InputStream + +import org.apache.camel.NoTypeConversionAvailableException +import org.junit.Assert._ +import org.scalatest.junit.JUnitSuite + +import org.junit.Test + +class MessageTest extends JUnitSuite { + + // + // TODO: extend/rewrite unit tests + // These tests currently only ensure proper functioning of basic features. + // + + @Test def shouldConvertDoubleBodyToString = { + CamelContextManager.init + assertEquals("1.4", Message(1.4, null).bodyAs(classOf[String])) + } + + @Test def shouldThrowExceptionWhenConvertingDoubleBodyToInputStream { + CamelContextManager.init + intercept[NoTypeConversionAvailableException] { + Message(1.4, null).bodyAs(classOf[InputStream]) + } + } + + @Test def shouldReturnSubsetOfHeaders = { + val message = Message("test" , Map("A" -> "1", "B" -> "2")) + assertEquals(Map("B" -> "2"), message.headers(Set("B"))) + } + + @Test def shouldTransformBodyAndPreserveHeaders = { + assertEquals( + Message("ab", Map("A" -> "1")), + Message("a" , Map("A" -> "1")).transformBody[String](body => body + "b")) + } + + @Test def shouldConvertBodyAndPreserveHeaders = { + CamelContextManager.init + assertEquals( + Message("1.4", Map("A" -> "1")), + Message(1.4 , Map("A" -> "1")).setBodyAs(classOf[String])) + } + + @Test def shouldSetBodyAndPreserveHeaders = { + assertEquals( + Message("test2" , Map("A" -> "1")), + Message("test1" , Map("A" -> "1")).setBody("test2")) + } + + @Test def shouldSetHeadersAndPreserveBody = { + assertEquals( + Message("test1" , Map("C" -> "3")), + Message("test1" , Map("A" -> "1")).setHeaders(Map("C" -> "3"))) + + } + + @Test def shouldAddHeaderAndPreserveBodyAndHeaders = { + assertEquals( + Message("test1" , Map("A" -> "1", "B" -> "2")), + Message("test1" , Map("A" -> "1")).addHeader("B" -> "2")) + } + + @Test def shouldAddHeadersAndPreserveBodyAndHeaders = { + assertEquals( + Message("test1" , Map("A" -> "1", "B" -> "2")), + Message("test1" , Map("A" -> "1")).addHeaders(Map("B" -> "2"))) + } + + @Test def shouldRemoveHeadersAndPreserveBodyAndRemainingHeaders = { + assertEquals( + Message("test1" , Map("A" -> "1")), + Message("test1" , Map("A" -> "1", "B" -> "2")).removeHeader("B")) + } + +} \ No newline at end of file diff --git a/akka-camel/src/test/scala/ProducerTest.scala b/akka-camel/src/test/scala/ProducerTest.scala new file mode 100644 index 0000000000..11ae148fb5 --- /dev/null +++ b/akka-camel/src/test/scala/ProducerTest.scala @@ -0,0 +1,109 @@ +package se.scalablesolutions.akka.camel + +import org.apache.camel.{Exchange, Processor} +import org.apache.camel.builder.RouteBuilder +import org.apache.camel.component.mock.MockEndpoint +import org.junit.Assert._ +import org.junit.{Test, After, Before} +import org.scalatest.junit.JUnitSuite + +import se.scalablesolutions.akka.actor.Actor + +class ProducerTest extends JUnitSuite { + + // + // TODO: extend/rewrite unit tests + // These tests currently only ensure proper functioning of basic features. + // + + import CamelContextManager._ + + var mock: MockEndpoint = _ + + @Before def setUp = { + init + context.addRoutes(new TestRouteBuilder) + start + mock = context.getEndpoint("mock:mock", classOf[MockEndpoint]) + } + + @After def tearDown = { + stop + } + + // + // TODO: test replies to messages sent with ! (bang) + // TODO: test copying of custom message headers + // + + @Test def shouldProduceMessageSyncAndReceiveResponse = { + val producer = new TestProducer("direct:input2", false, false).start + val message = Message("test1", Map(Message.MessageExchangeId -> "123")) + val expected = Message("Hello test1", Map(Message.MessageExchangeId -> "123")) + assertEquals(expected, producer !! message get) + producer.stop + } + + @Test def shouldProduceMessageSyncAndReceiveFailure = { + val producer = new TestProducer("direct:input2", false, false).start + val message = Message("fail", Map(Message.MessageExchangeId -> "123")) + val result = producer.!![Failure](message).get + assertEquals("failure", result.cause.getMessage) + assertEquals(Map(Message.MessageExchangeId -> "123"), result.headers) + producer.stop + } + + @Test def shouldProduceMessageAsyncAndReceiveResponse = { + val producer = new TestProducer("direct:input2", true, false).start + val message = Message("test2", Map(Message.MessageExchangeId -> "124")) + val expected = Message("Hello test2", Map(Message.MessageExchangeId -> "124")) + assertEquals(expected, producer !! message get) + producer.stop + } + + @Test def shouldProduceMessageAsyncAndReceiveFailure = { + val producer = new TestProducer("direct:input2", true, false).start + val message = Message("fail", Map(Message.MessageExchangeId -> "124")) + val result = producer.!![Failure](message).get + assertEquals("failure", result.cause.getMessage) + assertEquals(Map(Message.MessageExchangeId -> "124"), result.headers) + producer.stop + } + + @Test def shouldProduceMessageSyncWithoutReceivingResponse = { + val producer = new TestProducer("direct:input1", false, true).start + mock.expectedBodiesReceived("test3") + producer.!("test3")(None) + producer.stop + } + + @Test def shouldProduceMessageAsyncAndReceiveResponseSync = { + val producer = new TestProducer("direct:input1", true, true).start + mock.expectedBodiesReceived("test4") + producer.!("test4")(None) + producer.stop + } + + class TestProducer(uri:String, prodAsync: Boolean, prodOneway: Boolean) extends Actor with Producer { + override def async = prodAsync + override def oneway = prodOneway + def endpointUri = uri + def receive = produce + } + + class TestRouteBuilder extends RouteBuilder { + def configure { + from("direct:input1").to("mock:mock") + from("direct:input2").process(new Processor() { + def process(exchange: Exchange) = { + val body = exchange.getIn.getBody + body match { + case "fail" => throw new Exception("failure") + case body => exchange.getOut.setBody("Hello %s" format body) + } + } + }) + } + } + +} \ No newline at end of file diff --git a/akka-camel/src/test/scala/component/ActorComponentTest.scala b/akka-camel/src/test/scala/component/ActorComponentTest.scala new file mode 100644 index 0000000000..a856b9ac25 --- /dev/null +++ b/akka-camel/src/test/scala/component/ActorComponentTest.scala @@ -0,0 +1,49 @@ +package se.scalablesolutions.akka.camel.component + +import org.junit._ +import org.junit.Assert._ +import org.scalatest.junit.JUnitSuite + +import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.camel.{CamelContextLifecycle, Message} + +class ActorComponentTest extends JUnitSuite with CamelContextLifecycle { + + // + // TODO: extend/rewrite unit tests + // These tests currently only ensure proper functioning of basic features. + // + + @Before def setUp = { + init + start + } + + @After def tearDown = { + stop + } + + @Test def shouldReceiveResponseFromActorReferencedById = { + val actor = new TestActor + actor.start + assertEquals("Hello Martin", template.requestBody("actor:%s" format actor.getId, "Martin")) + assertEquals("Hello Martin", template.requestBody("actor:id:%s" format actor.getId, "Martin")) + actor.stop + } + + @Test def shouldReceiveResponseFromActorReferencedByUuid = { + val actor = new TestActor + actor.start + assertEquals("Hello Martin", template.requestBody("actor:uuid:%s" format actor.uuid, "Martin")) + actor.stop + } + + class TestActor extends Actor { + protected def receive = { + case msg: Message => reply("Hello %s" format msg.body) + } + } + +} + + diff --git a/akka-camel/src/test/scala/component/ActorProducerTest.scala b/akka-camel/src/test/scala/component/ActorProducerTest.scala new file mode 100644 index 0000000000..954a4d21cd --- /dev/null +++ b/akka-camel/src/test/scala/component/ActorProducerTest.scala @@ -0,0 +1,42 @@ +package se.scalablesolutions.akka.camel.component + +import org.apache.camel.{CamelContext, ExchangePattern} +import org.apache.camel.impl.{DefaultCamelContext, DefaultExchange} +import org.junit.Assert._ +import org.junit.Test +import org.scalatest.junit.JUnitSuite + +import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.camel.Message + +class ActorProducerTest extends JUnitSuite { + + // + // TODO: extend/rewrite unit tests + // These tests currently only ensure proper functioning of basic features. + // + + val context = new DefaultCamelContext + val endpoint = context.getEndpoint("actor:%s" format classOf[TestActor].getName) + val producer = endpoint.createProducer + + @Test def shouldSendAndReceiveMessageBodyAndHeaders = { + val exchange = new DefaultExchange(null.asInstanceOf[CamelContext], ExchangePattern.InOut) + val actor = new TestActor + actor.start + exchange.getIn.setBody("Martin") + exchange.getIn.setHeader("k1", "v1") + producer.process(exchange) + assertEquals("Hello Martin", exchange.getOut.getBody) + assertEquals("v1", exchange.getOut.getHeader("k1")) + assertEquals("v2", exchange.getOut.getHeader("k2")) + actor.stop + } + + class TestActor extends Actor { + protected def receive = { + case msg: Message => reply(Message("Hello %s" format msg.body, Map("k2" -> "v2") ++ msg.headers)) + } + } + +} diff --git a/akka-camel/src/test/scala/service/CamelServiceTest.scala b/akka-camel/src/test/scala/service/CamelServiceTest.scala new file mode 100644 index 0000000000..8fafea4687 --- /dev/null +++ b/akka-camel/src/test/scala/service/CamelServiceTest.scala @@ -0,0 +1,103 @@ +package se.scalablesolutions.akka.camel.service + +import org.apache.camel.builder.RouteBuilder +import org.junit.Assert._ +import org.scalatest.junit.JUnitSuite + +import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.annotation.consume +import se.scalablesolutions.akka.camel.{CamelContextManager, Consumer, Message} +import org.junit.{Ignore, Before, After, Test} + +class CamelServiceTest extends JUnitSuite with CamelService { + + // + // TODO: extend/rewrite unit tests + // These tests currently only ensure proper functioning of basic features. + // + + import CamelContextManager._ + + var actor1: Actor = _ + var actor2: Actor = _ + var actor3: Actor = _ + + @Before def setUp = { + // register actors before starting the CamelService + actor1 = new TestActor1().start + actor2 = new TestActor2().start + actor3 = new TestActor3().start + // initialize global CamelContext + init + // customize global CamelContext + context.addRoutes(new TestRouteBuilder) + consumerPublisher.expectPublishCount(2) + load + consumerPublisher.awaitPublish + } + + @After def tearDown = { + unload + actor1.stop + actor2.stop + actor3.stop + } + + @Test def shouldReceiveResponseViaPreStartGeneratedRoutes = { + assertEquals("Hello Martin (actor1)", template.requestBody("direct:actor1", "Martin")) + assertEquals("Hello Martin (actor2)", template.requestBody("direct:actor2", "Martin")) + } + + @Test def shouldReceiveResponseViaPostStartGeneratedRoute = { + consumerPublisher.expectPublishCount(1) + // register actor after starting CamelService + val actor4 = new TestActor4().start + consumerPublisher.awaitPublish + assertEquals("Hello Martin (actor4)", template.requestBody("direct:actor4", "Martin")) + actor4.stop + } + + @Test def shouldReceiveResponseViaCustomRoute = { + assertEquals("Hello Tester (actor3)", template.requestBody("direct:actor3", "Martin")) + } + +} + +class TestActor1 extends Actor with Consumer { + def endpointUri = "direct:actor1" + + protected def receive = { + case msg: Message => reply("Hello %s (actor1)" format msg.body) + } +} + +@consume("direct:actor2") +class TestActor2 extends Actor { + protected def receive = { + case msg: Message => reply("Hello %s (actor2)" format msg.body) + } +} + +class TestActor3 extends Actor { + id = "actor3" + + protected def receive = { + case msg: Message => reply("Hello %s (actor3)" format msg.body) + } +} + +class TestActor4 extends Actor with Consumer { + def endpointUri = "direct:actor4" + + protected def receive = { + case msg: Message => reply("Hello %s (actor4)" format msg.body) + } +} + +class TestRouteBuilder extends RouteBuilder { + def configure { + val actorUri = "actor:%s" format classOf[TestActor3].getName + from("direct:actor3").transform(constant("Tester")).to("actor:actor3") + } +} + diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 02ed7894d0..674afeb6ad 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -220,9 +220,10 @@ trait Actor extends TransactionManagement { private[akka] val _mailbox: Queue[MessageInvocation] = new ConcurrentLinkedQueue[MessageInvocation] /** - * This lock ensures thread safety in the dispatching: only one message can be dispatched at once on the actor. + * This lock ensures thread safety in the dispatching: only one message can + * be dispatched at once on the actor. */ - private[akka] val _dispatcherLock:Lock = new ReentrantLock + private[akka] val _dispatcherLock: Lock = new ReentrantLock // ==================================== // protected fields @@ -876,7 +877,6 @@ trait Actor extends TransactionManagement { * Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods. */ private[akka] def invoke(messageHandle: MessageInvocation) = synchronized { - //log.trace("%s is invoked with message %s", toString, messageHandle) try { if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle) else dispatch(messageHandle) diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index 9e0b1cba08..6db4d0375a 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -8,8 +8,7 @@ import se.scalablesolutions.akka.util.Logging import scala.collection.mutable.ListBuffer import scala.reflect.Manifest - -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap} /** * Registry holding all Actor instances in the whole system. @@ -23,9 +22,10 @@ import java.util.concurrent.ConcurrentHashMap * @author Jonas Bonér */ object ActorRegistry extends Logging { - private val actorsByUUID = new ConcurrentHashMap[String, Actor] - private val actorsById = new ConcurrentHashMap[String, List[Actor]] - private val actorsByClassName = new ConcurrentHashMap[String, List[Actor]] + private val actorsByUUID = new ConcurrentHashMap[String, Actor] + private val actorsById = new ConcurrentHashMap[String, List[Actor]] + private val actorsByClassName = new ConcurrentHashMap[String, List[Actor]] + private val registrationListeners = new CopyOnWriteArrayList[Actor] /** * Returns all actors in the system. @@ -103,6 +103,9 @@ object ActorRegistry extends Logging { if (actorsByClassName.containsKey(className)) { actorsByClassName.put(className, actor :: actorsByClassName.get(className)) } else actorsByClassName.put(className, actor :: Nil) + + // notify listeners + foreachListener(_.!(ActorRegistered(actor))(None)) } /** @@ -112,6 +115,8 @@ object ActorRegistry extends Logging { actorsByUUID remove actor.uuid actorsById remove actor.getId actorsByClassName remove actor.getClass.getName + // notify listeners + foreachListener(_.!(ActorUnregistered(actor))(None)) } /** @@ -125,4 +130,26 @@ object ActorRegistry extends Logging { actorsByClassName.clear log.info("All actors have been shut down and unregistered from ActorRegistry") } + + /** + * Adds the registration listener this this registry's listener list. + */ + def addRegistrationListener(listener: Actor) = { + registrationListeners.add(listener) + } + + /** + * Removes the registration listener this this registry's listener list. + */ + def removeRegistrationListener(listener: Actor) = { + registrationListeners.remove(listener) + } + + private def foreachListener(f: (Actor) => Unit) { + val iterator = registrationListeners.iterator + while (iterator.hasNext) f(iterator.next) + } } + +case class ActorRegistered(actor: Actor) +case class ActorUnregistered(actor: Actor) \ No newline at end of file diff --git a/akka-kernel/src/main/scala/Kernel.scala b/akka-kernel/src/main/scala/Kernel.scala index d7c7a4b2a5..6c0cd87058 100644 --- a/akka-kernel/src/main/scala/Kernel.scala +++ b/akka-kernel/src/main/scala/Kernel.scala @@ -7,6 +7,7 @@ package se.scalablesolutions.akka.kernel import se.scalablesolutions.akka.remote.BootableRemoteActorService import se.scalablesolutions.akka.comet.BootableCometActorService import se.scalablesolutions.akka.actor.BootableActorLoaderService +import se.scalablesolutions.akka.camel.service.CamelService import se.scalablesolutions.akka.config.Config import se.scalablesolutions.akka.util.{Logging, Bootable} @@ -37,7 +38,8 @@ object Kernel extends Logging { def boot: Unit = boot(true, new BootableActorLoaderService with BootableRemoteActorService - with BootableCometActorService) + with BootableCometActorService + with CamelService) /** * Boots up the Kernel. diff --git a/akka-samples/akka-sample-camel/src/main/resources/sample-camel-context.xml b/akka-samples/akka-sample-camel/src/main/resources/sample-camel-context.xml new file mode 100644 index 0000000000..b3d811d8de --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/resources/sample-camel-context.xml @@ -0,0 +1,23 @@ + + + + + + + + + + + + + + + + + + + diff --git a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala new file mode 100644 index 0000000000..11367dedb4 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala @@ -0,0 +1,92 @@ +package sample.camel + +import se.scalablesolutions.akka.actor.{Actor, RemoteActor} +import se.scalablesolutions.akka.annotation.consume +import se.scalablesolutions.akka.camel.{Producer, Message, Consumer} +import se.scalablesolutions.akka.util.Logging + +/** + * Client-initiated remote actor. + */ +class RemoteActor1 extends RemoteActor("localhost", 7777) with Consumer { + def endpointUri = "jetty:http://localhost:6644/remote1" + + protected def receive = { + case msg: Message => reply(Message("hello %s" format msg.body, Map("sender" -> "remote1"))) + } +} + +/** + * Server-initiated remote actor. + */ +class RemoteActor2 extends Actor with Consumer { + def endpointUri = "jetty:http://localhost:6644/remote2" + + protected def receive = { + case msg: Message => reply(Message("hello %s" format msg.body, Map("sender" -> "remote2"))) + } +} + +class Producer1 extends Actor with Producer { + def endpointUri = "direct:welcome" + + override def oneway = false // default + override def async = true // default + + protected def receive = produce +} + +class Consumer1 extends Actor with Consumer with Logging { + def endpointUri = "file:data/input" + + def receive = { + case msg: Message => log.info("received %s" format msg.bodyAs(classOf[String])) + } +} + +@consume("jetty:http://0.0.0.0:8877/camel/test1") +class Consumer2 extends Actor { + def receive = { + case msg: Message => reply("Hello %s" format msg.bodyAs(classOf[String])) + } +} + +class Consumer3(transformer: Actor) extends Actor with Consumer { + def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome" + + def receive = { + case msg: Message => transformer.forward(msg.setBodyAs(classOf[String])) + } +} + +class Transformer(producer: Actor) extends Actor { + protected def receive = { + case msg: Message => producer.forward(msg.transformBody[String]("- %s -" format _)) + } +} + +class Subscriber(name:String, uri: String) extends Actor with Consumer { + def endpointUri = uri + + protected def receive = { + case msg: Message => log.info("%s received: %s" format (name, msg.body)) + } +} + +class Publisher(name: String, uri: String) extends Actor with Producer { + id = name + def endpointUri = uri + override def oneway = true + protected def receive = produce +} + +class PublisherBridge(uri: String, publisher: Actor) extends Actor with Consumer { + def endpointUri = uri + + protected def receive = { + case msg: Message => { + publisher ! msg.bodyAs(classOf[String]) + reply("message published") + } + } +} \ No newline at end of file diff --git a/akka-samples/akka-sample-camel/src/main/scala/Application1.scala b/akka-samples/akka-sample-camel/src/main/scala/Application1.scala new file mode 100644 index 0000000000..4a55f2014f --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/scala/Application1.scala @@ -0,0 +1,28 @@ +package sample.camel + +import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.camel.Message +import se.scalablesolutions.akka.remote.RemoteClient + +/** + * @author Martin Krasser + */ +object Application1 { + + // + // TODO: completion of example + // + + def main(args: Array[String]) { + implicit val sender: Option[Actor] = None + + val actor1 = new RemoteActor1 + val actor2 = RemoteClient.actorFor("remote2", "localhost", 7777) + + actor1.start + + println(actor1 !! Message("actor1")) + println(actor2 !! Message("actor2")) + } + +} \ No newline at end of file diff --git a/akka-samples/akka-sample-camel/src/main/scala/Application2.scala b/akka-samples/akka-sample-camel/src/main/scala/Application2.scala new file mode 100644 index 0000000000..83c6e8c439 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/scala/Application2.scala @@ -0,0 +1,22 @@ +package sample.camel + +import se.scalablesolutions.akka.camel.service.CamelService +import se.scalablesolutions.akka.remote.RemoteNode + +/** + * @author Martin Krasser + */ +object Application2 { + + // + // TODO: completion of example + // + + def main(args: Array[String]) { + val camelService = CamelService.newInstance + camelService.load + RemoteNode.start("localhost", 7777) + RemoteNode.register("remote2", new RemoteActor2().start) + } + +} \ No newline at end of file diff --git a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala new file mode 100644 index 0000000000..42cb367076 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala @@ -0,0 +1,70 @@ +package sample.camel + +import org.apache.camel.{Exchange, Processor} +import org.apache.camel.builder.RouteBuilder +import org.apache.camel.impl.DefaultCamelContext +import org.apache.camel.spring.spi.ApplicationContextRegistry +import org.springframework.context.support.ClassPathXmlApplicationContext + +import se.scalablesolutions.akka.actor.SupervisorFactory +import se.scalablesolutions.akka.camel.CamelContextManager +import se.scalablesolutions.akka.config.ScalaConfig._ + +/** + * @author Martin Krasser + */ +class Boot { + + // Create CamelContext with Spring-based registry and custom route builder + + val context = new ClassPathXmlApplicationContext("/sample-camel-context.xml", getClass) + val registry = new ApplicationContextRegistry(context) + CamelContextManager.init(new DefaultCamelContext(registry)) + CamelContextManager.context.addRoutes(new CustomRouteBuilder) + + // Basic example + + val factory = SupervisorFactory( + SupervisorConfig( + RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), + Supervise(new Consumer1, LifeCycle(Permanent)) :: + Supervise(new Consumer2, LifeCycle(Permanent)) :: Nil)) + factory.newInstance.start + + // Routing example + + val producer = new Producer1 + val mediator = new Transformer(producer) + val consumer = new Consumer3(mediator) + + producer.start + mediator.start + consumer.start + + // Publish subscribe example + + val cometdUri = "cometd://localhost:8111/test/abc?resourceBase=target" + val cometdSubscriber = new Subscriber("cometd-subscriber", cometdUri).start + val cometdPublisher = new Publisher("cometd-publisher", cometdUri).start + + val jmsUri = "jms:topic:test" + val jmsSubscriber1 = new Subscriber("jms-subscriber-1", jmsUri).start + val jmsSubscriber2 = new Subscriber("jms-subscriber-2", jmsUri).start + val jmsPublisher = new Publisher("jms-publisher", jmsUri).start + + val cometdPublisherBridge = new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/cometd", cometdPublisher).start + val jmsPublisherBridge = new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher).start + +} + +class CustomRouteBuilder extends RouteBuilder { + def configure { + val actorUri = "actor:%s" format classOf[Consumer2].getName + from("jetty:http://0.0.0.0:8877/camel/test2").to(actorUri) + from("direct:welcome").process(new Processor() { + def process(exchange: Exchange) { + exchange.getOut.setBody("Welcome %s" format exchange.getIn.getBody) + } + }) + } +} \ No newline at end of file diff --git a/akka-util-java/src/main/java/se/scalablesolutions/akka/annotation/consume.java b/akka-util-java/src/main/java/se/scalablesolutions/akka/annotation/consume.java new file mode 100644 index 0000000000..3f8ab9455a --- /dev/null +++ b/akka-util-java/src/main/java/se/scalablesolutions/akka/annotation/consume.java @@ -0,0 +1,18 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface consume { + + public abstract String value(); + +} \ No newline at end of file diff --git a/config/akka-reference.conf b/config/akka-reference.conf index ca9f1068ea..6c442e3ef1 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -19,8 +19,9 @@ # FQN to the class doing initial active object/actor # supervisor bootstrap, should be defined in default constructor - boot = ["sample.java.Boot", - "sample.scala.Boot", + boot = ["sample.camel.Boot", + "sample.java.Boot", + "sample.scala.Boot", "se.scalablesolutions.akka.security.samples.Boot"] diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 72b51d4f4d..326daab975 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -79,12 +79,13 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { lazy val akka_amqp = project("akka-amqp", "akka-amqp", new AkkaAMQPProject(_), akka_core) lazy val akka_rest = project("akka-rest", "akka-rest", new AkkaRestProject(_), akka_core) lazy val akka_comet = project("akka-comet", "akka-comet", new AkkaCometProject(_), akka_rest) + lazy val akka_camel = project("akka-camel", "akka-camel", new AkkaCamelProject(_), akka_core) lazy val akka_patterns = project("akka-patterns", "akka-patterns", new AkkaPatternsProject(_), akka_core) lazy val akka_security = project("akka-security", "akka-security", new AkkaSecurityProject(_), akka_core) lazy val akka_persistence = project("akka-persistence", "akka-persistence", new AkkaPersistenceParentProject(_)) lazy val akka_cluster = project("akka-cluster", "akka-cluster", new AkkaClusterParentProject(_)) lazy val akka_kernel = project("akka-kernel", "akka-kernel", new AkkaKernelProject(_), - akka_core, akka_rest, akka_persistence, akka_cluster, akka_amqp, akka_security, akka_comet, akka_patterns) + akka_core, akka_rest, akka_persistence, akka_cluster, akka_amqp, akka_security, akka_comet, akka_camel, akka_patterns) // functional tests in java lazy val akka_fun_test = project("akka-fun-test-java", "akka-fun-test-java", new AkkaFunTestProject(_), akka_kernel) @@ -112,6 +113,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { " dist/akka-cluster-jgroups_%s-%s.jar".format(defScalaVersion.value, version) + " dist/akka-rest_%s-%s.jar".format(defScalaVersion.value, version) + " dist/akka-comet_%s-%s.jar".format(defScalaVersion.value, version) + + " dist/akka-camel_%s-%s.jar".format(defScalaVersion.value, version) + " dist/akka-security_%s-%s.jar".format(defScalaVersion.value, version) + " dist/akka-amqp_%s-%s.jar".format(defScalaVersion.value, version) + " dist/akka-patterns_%s-%s.jar".format(defScalaVersion.value, version) + @@ -208,6 +210,11 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { lazy val dist = deployTask(info, distPath) dependsOn(`package`) describedAs("Deploying") } + class AkkaCamelProject(info: ProjectInfo) extends DefaultProject(info) { + val camel_core = "org.apache.camel" % "camel-core" % "2.2.0" % "compile" + lazy val dist = deployTask(info, distPath) dependsOn(`package`) describedAs("Deploying") + } + class AkkaPatternsProject(info: ProjectInfo) extends DefaultProject(info) { // testing val scalatest = "org.scalatest" % "scalatest" % "1.0" % "test" @@ -306,7 +313,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { val lift_util = "net.liftweb" % "lift-util" % "1.1-M6" % "compile" val servlet = "javax.servlet" % "servlet-api" % "2.5" % "compile" // testing - val jetty = "org.mortbay.jetty" % "jetty" % "6.1.6" % "test" + val jetty = "org.mortbay.jetty" % "jetty" % "6.1.11" % "test" val junit = "junit" % "junit" % "4.5" % "test" lazy val dist = deployTask(info, deployPath) dependsOn(`package`) describedAs("Deploying") } @@ -320,6 +327,17 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { lazy val dist = deployTask(info, deployPath) dependsOn(`package`) describedAs("Deploying") } + class AkkaSampleCamelProject(info: ProjectInfo) extends DefaultProject(info) { + val jetty = "org.mortbay.jetty" % "jetty" % "6.1.11" % "compile" + val jetty_client = "org.mortbay.jetty" % "jetty-client" % "6.1.11" % "compile" + val camel_http = "org.apache.camel" % "camel-http" % "2.2.0" % "compile" + val camel_jetty = "org.apache.camel" % "camel-jetty" % "2.2.0" % "compile" intransitive() + val camel_jms = "org.apache.camel" % "camel-jms" % "2.2.0" % "compile" + val camel_cometd = "org.apache.camel" % "camel-cometd" % "2.2.0" % "compile" + val activemq_core = "org.apache.activemq" % "activemq-core" % "5.3.0" % "compile" + lazy val dist = deployTask(info, deployPath) dependsOn(`package`) describedAs("Deploying") + } + class AkkaSampleSecurityProject(info: ProjectInfo) extends DefaultProject(info) { val jsr311 = "javax.ws.rs" % "jsr311-api" % "1.1" % "compile" val jsr250 = "javax.annotation" % "jsr250-api" % "1.0" @@ -331,6 +349,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { lazy val akka_sample_lift = project("akka-sample-lift", "akka-sample-lift", new AkkaSampleLiftProject(_), akka_kernel) lazy val akka_sample_rest_java = project("akka-sample-rest-java", "akka-sample-rest-java", new AkkaSampleRestJavaProject(_), akka_kernel) lazy val akka_sample_rest_scala = project("akka-sample-rest-scala", "akka-sample-rest-scala", new AkkaSampleRestScalaProject(_), akka_kernel) + lazy val akka_sample_camel = project("akka-sample-camel", "akka-sample-camel", new AkkaSampleCamelProject(_), akka_kernel) lazy val akka_sample_security = project("akka-sample-security", "akka-sample-security", new AkkaSampleSecurityProject(_), akka_kernel) }