Merge branch 'master' into workstealing

This commit is contained in:
Jan Van Besien 2010-03-17 14:39:05 +01:00
commit fd40d15fb3
43 changed files with 1677 additions and 45 deletions

View file

@ -0,0 +1 @@
class=se.scalablesolutions.akka.camel.component.ActorComponent

View file

@ -0,0 +1,95 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel
import org.apache.camel.{ProducerTemplate, CamelContext}
import org.apache.camel.impl.DefaultCamelContext
import se.scalablesolutions.akka.util.Logging
/**
* Defines the lifecycle of a CamelContext. Allowed state transitions are
* init -> start -> stop -> init -> ... etc.
*
* @author Martin Krasser
*/
trait CamelContextLifecycle extends Logging {
// TODO: enforce correct state transitions
// valid: init -> start -> stop -> init ...
private var _context: CamelContext = _
private var _template: ProducerTemplate = _
private var _initialized = false
private var _started = false
/**
* Returns the managed CamelContext.
*/
protected def context: CamelContext = _context
/**
* Returns the managed ProducerTemplate.
*/
protected def template: ProducerTemplate = _template
/**
* Sets the managed CamelContext.
*/
protected def context_= (context: CamelContext) { _context = context }
/**
* Sets the managed ProducerTemplate.
*/
protected def template_= (template: ProducerTemplate) { _template = template }
def initialized = _initialized
def started = _started
/**
* Starts the CamelContext and ProducerTemplate.
*/
def start = {
context.start
template.start
_started = true
log.info("Camel context started")
}
/**
* Stops the CamelContext and ProducerTemplate.
*/
def stop = {
template.stop
context.stop
_initialized = false
_started = false
log.info("Camel context stopped")
}
/**
* Initializes this lifecycle object with the a DefaultCamelContext.
*/
def init: Unit = init(new DefaultCamelContext)
/**
* Initializes this lifecycle object with the given CamelContext.
*/
def init(context: CamelContext) {
this.context = context
this.template = context.createProducerTemplate
_initialized = true
log.info("Camel context initialized")
}
}
/**
* Makes a global CamelContext and ProducerTemplate accessible to applications. The lifecycle
* of these objects is managed by se.scalablesolutions.akka.camel.service.CamelService.
*/
object CamelContextManager extends CamelContextLifecycle {
override def context: CamelContext = super.context
override def template: ProducerTemplate = super.template
}

View file

@ -0,0 +1,20 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel
import se.scalablesolutions.akka.actor.Actor
/**
* Mixed in by Actor implementations that consume message from Camel endpoints.
*
* @author Martin Krasser
*/
trait Consumer { self: Actor =>
/**
* Returns the Camel endpoint URI to consume messages from.
*/
def endpointUri: String
}

View file

@ -0,0 +1,249 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel
import org.apache.camel.{Exchange, Message => CamelMessage}
import org.apache.camel.util.ExchangeHelper
import scala.collection.jcl.{Map => MapWrapper}
/**
* An immutable representation of a Camel message. Actor classes that mix in
* se.scalablesolutions.akka.camel.Producer or
* se.scalablesolutions.akka.camel.Consumer use this message type for communication.
*
* @author Martin Krasser
*/
case class Message(val body: Any, val headers: Map[String, Any]) {
/**
* Creates a message with a body and an empty header map.
*/
def this(body: Any) = this(body, Map.empty)
/**
* Returns the body of the message converted to the type given by the <code>clazz</code>
* argument. Conversion is done using Camel's type converter. The type converter is obtained
* from the CamelContext managed by CamelContextManager. Applications have to ensure proper
* initialization of CamelContextManager.
*
* @see CamelContextManager.
*/
def bodyAs[T](clazz: Class[T]): T =
CamelContextManager.context.getTypeConverter.mandatoryConvertTo[T](clazz, body)
/**
* Returns those headers from this message whose name is contained in <code>names</code>.
*/
def headers(names: Set[String]): Map[String, Any] = headers.filter(names contains _._1)
/**
* Creates a Message with a new <code>body</code> using a <code>transformer</code> function.
*/
def transformBody[A](transformer: A => Any): Message = setBody(transformer(body.asInstanceOf[A]))
/**
* Creates a Message with a new <code>body</code> converted to type <code>clazz</code>.
*
* @see Message#bodyAs(Class)
*/
def setBodyAs[T](clazz: Class[T]): Message = setBody(bodyAs(clazz))
/**
* Creates a Message with a new <code>body</code>.
*/
def setBody(body: Any) = new Message(body, this.headers)
/**
* Creates a new Message with new <code>headers</code>.
*/
def setHeaders(headers: Map[String, Any]) = new Message(this.body, headers)
/**
* Creates a new Message with the <code>headers</code> argument added to the existing headers.
*/
def addHeaders(headers: Map[String, Any]) = new Message(this.body, this.headers ++ headers)
/**
* Creates a new Message with the <code>header</code> argument added to the existing headers.
*/
def addHeader(header: (String, Any)) = new Message(this.body, this.headers + header)
/**
* Creates a new Message where the header with name <code>headerName</code> is removed from
* the existing headers.
*/
def removeHeader(headerName: String) = new Message(this.body, this.headers - headerName)
}
/**
* Companion object of Message class.
*
* @author Martin Krasser
*/
object Message {
/**
* Message header to correlate request with response messages. Applications that send
* messages to a Producer actor may want to set this header on the request message
* so that it can be correlated with an asynchronous response. Messages send to Consumer
* actors have this header already set.
*/
val MessageExchangeId = "MessageExchangeId".intern
/**
* Creates a new Message with <code>body</code> as message body and an empty header map.
*/
def apply(body: Any) = new Message(body)
/**
* Creates a canonical form of the given message <code>msg</code>. If <code>msg</code> of type
* Message then <code>msg</code> is returned, otherwise <code>msg</code> is set as body of a
* newly created Message object.
*/
def canonicalize(msg: Any) = msg match {
case mobj: Message => mobj
case body => new Message(body)
}
}
/**
* An immutable representation of a failed Camel exchange. It contains the failure cause
* obtained from Exchange.getException and the headers from either the Exchange.getIn
* message or Exchange.getOut message, depending on the exchange pattern.
*
* @author Martin Krasser
*/
case class Failure(val cause: Exception, val headers: Map[String, Any])
/**
* Adapter for converting an org.apache.camel.Exchange to and from Message and Failure objects.
*
* @author Martin Krasser
*/
class CamelExchangeAdapter(exchange: Exchange) {
import CamelMessageConversion.toMessageAdapter
/**
* Sets Exchange.getIn from the given Message object.
*/
def fromRequestMessage(msg: Message): Exchange = { requestMessage.fromMessage(msg); exchange }
/**
* Depending on the exchange pattern, sets Exchange.getIn or Exchange.getOut from the given
* Message object. If the exchange is out-capable then the Exchange.getOut is set, otherwise
* Exchange.getIn.
*/
def fromResponseMessage(msg: Message): Exchange = { responseMessage.fromMessage(msg); exchange }
/**
* Sets Exchange.getException from the given Failure message. Headers of the Failure message
* are ignored.
*/
def fromFailureMessage(msg: Failure): Exchange = { exchange.setException(msg.cause); exchange }
/**
* Creates a Message object from Exchange.getIn.
*/
def toRequestMessage: Message = toRequestMessage(Map.empty)
/**
* Depending on the exchange pattern, creates a Message object from Exchange.getIn or Exchange.getOut.
* If the exchange is out-capable then the Exchange.getOut is set, otherwise Exchange.getIn.
*/
def toResponseMessage: Message = toResponseMessage(Map.empty)
/**
* Creates a Failure object from the adapted Exchange.
*
* @see Failure
*/
def toFailureMessage: Failure = toFailureMessage(Map.empty)
/**
* Creates a Message object from Exchange.getIn.
*
* @param headers additional headers to set on the created Message in addition to those
* in the Camel message.
*/
def toRequestMessage(headers: Map[String, Any]): Message = requestMessage.toMessage(headers)
/**
* Depending on the exchange pattern, creates a Message object from Exchange.getIn or Exchange.getOut.
* If the exchange is out-capable then the Exchange.getOut is set, otherwise Exchange.getIn.
*
* @param headers additional headers to set on the created Message in addition to those
* in the Camel message.
*/
def toResponseMessage(headers: Map[String, Any]): Message = responseMessage.toMessage(headers)
/**
* Creates a Failure object from the adapted Exchange.
*
* @param headers additional headers to set on the created Message in addition to those
* in the Camel message.
*
* @see Failure
*/
def toFailureMessage(headers: Map[String, Any]): Failure =
Failure(exchange.getException, headers ++ responseMessage.toMessage.headers)
private def requestMessage = exchange.getIn
private def responseMessage = ExchangeHelper.getResultMessage(exchange)
}
/**
* Adapter for converting an org.apache.camel.Message to and from Message objects.
*
* @author Martin Krasser
*/
class CamelMessageAdapter(val cm: CamelMessage) {
/**
* Set the adapted Camel message from the given Message object.
*/
def fromMessage(m: Message): CamelMessage = {
cm.setBody(m.body)
for (h <- m.headers) cm.getHeaders.put(h._1, h._2.asInstanceOf[AnyRef])
cm
}
/**
* Creates a new Message object from the adapted Camel message.
*/
def toMessage: Message = toMessage(Map.empty)
/**
* Creates a new Message object from the adapted Camel message.
*
* @param headers additional headers to set on the created Message in addition to those
* in the Camel message.
*/
def toMessage(headers: Map[String, Any]): Message = Message(cm.getBody, cmHeaders(headers, cm))
private def cmHeaders(headers: Map[String, Any], cm: CamelMessage) =
headers ++ MapWrapper[String, AnyRef](cm.getHeaders).elements
}
/**
* Defines conversion methods to CamelExchangeAdapter and CamelMessageAdapter.
* Imported by applications
* that implicitly want to use conversion methods of CamelExchangeAdapter and CamelMessageAdapter.
*/
object CamelMessageConversion {
/**
* Creates an CamelExchangeAdapter for the given Camel exchange.
*/
implicit def toExchangeAdapter(ce: Exchange): CamelExchangeAdapter =
new CamelExchangeAdapter(ce)
/**
* Creates an CamelMessageAdapter for the given Camel message.
*/
implicit def toMessageAdapter(cm: CamelMessage): CamelMessageAdapter =
new CamelMessageAdapter(cm)
}

View file

@ -0,0 +1,192 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel
import CamelMessageConversion.toExchangeAdapter
import org.apache.camel.{Processor, ExchangePattern, Exchange, ProducerTemplate}
import org.apache.camel.impl.DefaultExchange
import org.apache.camel.spi.Synchronization
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.dispatch.CompletableFuture
import se.scalablesolutions.akka.util.Logging
/**
* Mixed in by Actor implementations that produce messages to Camel endpoints.
*
* @author Martin Krasser
*/
trait Producer { self: Actor =>
private val headersToCopyDefault = Set(Message.MessageExchangeId)
/**
* If set to true (default), communication with the Camel endpoint is done via the Camel
* <a href="http://camel.apache.org/async.html">Async API</a>. Camel then processes the
* message in a separate thread. If set to false, the actor thread is blocked until Camel
* has finished processing the produced message.
*/
def async: Boolean = true
/**
* If set to false (default), this producer expects a response message from the Camel endpoint.
* If set to true, this producer communicates with the Camel endpoint with an in-only message
* exchange pattern (fire and forget).
*/
def oneway: Boolean = false
/**
* Returns the Camel endpoint URI to produce messages to.
*/
def endpointUri: String
/**
* Returns the names of message headers to copy from a request message to a response message.
* By default only the Message.MessageExchangeId is copied. Applications may override this to
* define an application-specific set of message headers to copy.
*/
def headersToCopy: Set[String] = headersToCopyDefault
/**
* Returns the producer template from the CamelContextManager. Applications either have to ensure
* proper initialization of CamelContextManager or override this method.
*
* @see CamelContextManager.
*/
protected def template: ProducerTemplate = CamelContextManager.template
/**
* Initiates a one-way (in-only) message exchange to the Camel endpoint given by
* <code>endpointUri</code>. This method blocks until Camel finishes processing
* the message exchange.
*
* @param msg: the message to produce. The message is converted to its canonical
* representation via Message.canonicalize.
*/
protected def produceOneway(msg: Any): Unit =
template.send(endpointUri, createInOnlyExchange.fromRequestMessage(Message.canonicalize(msg)))
/**
* Initiates a one-way (in-only) message exchange to the Camel endpoint given by
* <code>endpointUri</code>. This method triggers asynchronous processing of the
* message exchange by Camel.
*
* @param msg: the message to produce. The message is converted to its canonical
* representation via Message.canonicalize.
*/
protected def produceOnewayAsync(msg: Any): Unit =
template.asyncSend(
endpointUri, createInOnlyExchange.fromRequestMessage(Message.canonicalize(msg)))
/**
* Initiates a two-way (in-out) message exchange to the Camel endpoint given by
* <code>endpointUri</code>. This method blocks until Camel finishes processing
* the message exchange.
*
* @param msg: the message to produce. The message is converted to its canonical
* representation via Message.canonicalize.
* @return either a response Message or a Failure object.
*/
protected def produce(msg: Any): Any = {
val cmsg = Message.canonicalize(msg)
val requestProcessor = new Processor() {
def process(exchange: Exchange) = exchange.fromRequestMessage(cmsg)
}
val result = template.request(endpointUri, requestProcessor)
if (result.isFailed) result.toFailureMessage(cmsg.headers(headersToCopy))
else result.toResponseMessage(cmsg.headers(headersToCopy))
}
/**
* Initiates a two-way (in-out) message exchange to the Camel endpoint given by
* <code>endpointUri</code>. This method triggers asynchronous processing of the
* message exchange by Camel. The response message is returned asynchronously to
* the original sender (or sender future).
*
* @param msg: the message to produce. The message is converted to its canonical
* representation via Message.canonicalize.
* @return either a response Message or a Failure object.
* @see ProducerResponseSender
*/
protected def produceAsync(msg: Any): Unit = {
val cmsg = Message.canonicalize(msg)
val sync = new ProducerResponseSender(
cmsg.headers(headersToCopy), this.sender, this.senderFuture, this)
template.asyncCallback(endpointUri, createInOutExchange.fromRequestMessage(cmsg), sync)
}
/**
* Default implementation for Actor.receive. Implementors may choose to
* <code>def receive = produce</code>. This partial function calls one of
* the protected produce methods depending on the return values of
* <code>oneway</code> and <code>async</code>.
*/
protected def produce: PartialFunction[Any, Unit] = {
case msg => {
if ( oneway && !async) produceOneway(msg)
else if ( oneway && async) produceOnewayAsync(msg)
else if (!oneway && !async) reply(produce(msg))
else /*(!oneway && async)*/ produceAsync(msg)
}
}
/**
* Creates a new in-only Exchange.
*/
protected def createInOnlyExchange: Exchange = createExchange(ExchangePattern.InOnly)
/**
* Creates a new in-out Exchange.
*/
protected def createInOutExchange: Exchange = createExchange(ExchangePattern.InOut)
/**
* Creates a new Exchange with given pattern from the CamelContext managed by
* CamelContextManager. Applications either have to ensure proper initialization
* of CamelContextManager or override this method.
*
* @see CamelContextManager.
*/
protected def createExchange(pattern: ExchangePattern): Exchange =
new DefaultExchange(CamelContextManager.context, pattern)
}
/**
* Synchronization object that sends responses asynchronously to initial senders. This
* class is used by Producer for asynchronous two-way messaging with a Camel endpoint.
*
* @author Martin Krasser
*/
class ProducerResponseSender(
headers: Map[String, Any],
sender: Option[Actor],
senderFuture: Option[CompletableFuture],
producer: Actor) extends Synchronization with Logging {
implicit val producerActor = Some(producer) // the response sender
/**
* Replies a Failure message, created from the given exchange, to <code>sender</code> (or
* <code>senderFuture</code> if applicable).
*/
def onFailure(exchange: Exchange) = reply(exchange.toFailureMessage(headers))
/**
* Replies a response Message, created from the given exchange, to <code>sender</code> (or
* <code>senderFuture</code> if applicable).
*/
def onComplete(exchange: Exchange) = reply(exchange.toResponseMessage(headers))
private def reply(message: Any) = {
sender match {
case Some(actor) => actor ! message
case None => senderFuture match {
case Some(future) => future.completeWithResult(message)
case None => log.warning("No destination for sending response")
}
}
}
}

View file

@ -0,0 +1,152 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel.component
import java.lang.{RuntimeException, String}
import java.util.{Map => JavaMap}
import java.util.concurrent.TimeoutException
import org.apache.camel.{Exchange, Consumer, Processor}
import org.apache.camel.impl.{DefaultProducer, DefaultEndpoint, DefaultComponent}
import se.scalablesolutions.akka.actor.{ActorRegistry, Actor}
import se.scalablesolutions.akka.camel.{Failure, CamelMessageConversion, Message}
/**
* Camel component for sending messages to and receiving replies from actors.
*
* @see se.scalablesolutions.akka.camel.component.ActorEndpoint
* @see se.scalablesolutions.akka.camel.component.ActorProducer
*
* @author Martin Krasser
*/
class ActorComponent extends DefaultComponent {
def createEndpoint(uri: String, remaining: String, parameters: JavaMap[String, Object]): ActorEndpoint = {
val idAndUuid = idAndUuidPair(remaining)
new ActorEndpoint(uri, this, idAndUuid._1, idAndUuid._2)
}
private def idAndUuidPair(remaining: String): Tuple2[Option[String], Option[String]] = {
remaining split ":" toList match {
case id :: Nil => (Some(id), None)
case "id" :: id :: Nil => (Some(id), None)
case "uuid" :: uuid :: Nil => (None, Some(uuid))
case _ => throw new IllegalArgumentException(
"invalid path format: %s - should be <actorid> or id:<actorid> or uuid:<actoruuid>" format remaining)
}
}
}
/**
* Camel endpoint for referencing an actor. The actor reference is given by the endpoint URI.
* An actor can be referenced by its <code>Actor.getId</code> or its <code>Actor.uuid</code>.
* Supported endpoint URI formats are
* <code>actor:&lt;actorid&gt;</code>,
* <code>actor:id:&lt;actorid&gt;</code> and
* <code>actor:uuid:&lt;actoruuid&gt;</code>.
*
* @see se.scalablesolutions.akka.camel.component.ActorComponent
* @see se.scalablesolutions.akka.camel.component.ActorProducer
* @author Martin Krasser
*/
class ActorEndpoint(uri: String,
comp: ActorComponent,
val id: Option[String],
val uuid: Option[String]) extends DefaultEndpoint(uri, comp) {
/**
* @throws UnsupportedOperationException
*/
def createConsumer(processor: Processor): Consumer =
throw new UnsupportedOperationException("actor consumer not supported yet")
/**
* Creates a new ActorProducer instance initialized with this endpoint.
*/
def createProducer: ActorProducer = new ActorProducer(this)
/**
* Returns true.
*/
def isSingleton: Boolean = true
}
/**
* Sends the in-message of an exchange to an actor. If the exchange pattern is out-capable,
* the producer waits for a reply (using the !! operator), otherwise the ! operator is used
* for sending the message.
*
* @see se.scalablesolutions.akka.camel.component.ActorComponent
* @see se.scalablesolutions.akka.camel.component.ActorEndpoint
*
* @author Martin Krasser
*/
class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
import CamelMessageConversion.toExchangeAdapter
implicit val sender = None
/**
* Depending on the exchange pattern, this method either calls processInOut or
* processInOnly for interacting with an actor. This methods looks up the actor
* from the ActorRegistry according to this producer's endpoint URI.
*
* @param exchange represents the message exchange with the actor.
*/
def process(exchange: Exchange) {
val actor = target getOrElse (throw new ActorNotRegisteredException(ep.getEndpointUri))
if (exchange.getPattern.isOutCapable) processInOut(exchange, actor)
else processInOnly(exchange, actor)
}
/**
* Send the exchange in-message to the given actor using the ! operator. The message
* send to the actor is of type se.scalablesolutions.akka.camel.Message.
*/
protected def processInOnly(exchange: Exchange, actor: Actor): Unit =
actor ! exchange.toRequestMessage(Map(Message.MessageExchangeId -> exchange.getExchangeId))
/**
* Send the exchange in-message to the given actor using the !! operator. The exchange
* out-message is populated from the actor's reply message. The message sent to the
* actor is of type se.scalablesolutions.akka.camel.Message.
*/
protected def processInOut(exchange: Exchange, actor: Actor) {
val header = Map(Message.MessageExchangeId -> exchange.getExchangeId)
val result: Any = actor !! exchange.toRequestMessage(header)
result match {
case Some(msg: Failure) => exchange.fromFailureMessage(msg)
case Some(msg) => exchange.fromResponseMessage(Message.canonicalize(msg))
case None => {
throw new TimeoutException("timeout (%d ms) while waiting response from %s"
format (actor.timeout, ep.getEndpointUri))
}
}
}
private def target: Option[Actor] =
if (ep.id.isDefined) targetById(ep.id.get)
else targetByUuid(ep.uuid.get)
private def targetById(id: String) = ActorRegistry.actorsFor(id) match {
case Nil => None
case actor :: Nil => Some(actor)
case actors => Some(actors.first)
}
private def targetByUuid(uuid: String) = ActorRegistry.actorFor(uuid)
}
/**
* Thrown to indicate that an actor referenced by an endpoint URI cannot be
* found in the ActorRegistry.
*
* @author Martin Krasser
*/
class ActorNotRegisteredException(uri: String) extends RuntimeException {
override def getMessage = "%s not registered" format uri
}

View file

@ -0,0 +1,89 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel.service
import se.scalablesolutions.akka.actor.ActorRegistry
import se.scalablesolutions.akka.camel.CamelContextManager
import se.scalablesolutions.akka.util.{Bootable, Logging}
/**
* Used by applications (and the Kernel) to publish consumer actors via Camel
* endpoints and to manage the life cycle of a a global CamelContext which can
* be accessed via se.scalablesolutions.akka.camel.CamelContextManager.
*
* @author Martin Krasser
*/
trait CamelService extends Bootable with Logging {
import se.scalablesolutions.akka.actor.Actor.Sender.Self
import CamelContextManager._
private[camel] val consumerPublisher = new ConsumerPublisher
private[camel] val publishRequestor = new PublishRequestor(consumerPublisher)
/**
* Starts the CamelService. Any started actor that is a consumer actor will be (asynchronously)
* published as Camel endpoint. Consumer actors that are started after this method returned will
* be published as well. Actor publishing is done asynchronously.
*/
abstract override def onLoad = {
super.onLoad
// Only init and start if not already done by application
if (!initialized) init
if (!started) start
// Camel should cache input streams
context.setStreamCaching(true)
// start actor that exposes consumer actors via Camel endpoints
consumerPublisher.start
// add listener for actor registration events
ActorRegistry.addRegistrationListener(publishRequestor.start)
// publish already registered consumer actors
for (publish <- Publish.forConsumers(ActorRegistry.actors)) consumerPublisher ! publish
}
/**
* Stops the CamelService.
*/
abstract override def onUnload = {
ActorRegistry.removeRegistrationListener(publishRequestor)
publishRequestor.stop
consumerPublisher.stop
stop
super.onUnload
}
/**
* Starts the CamelService.
*
* @see onLoad
*/
def load = onLoad
/**
* Stops the CamelService.
*
* @see onUnload
*/
def unload = onUnload
}
/**
* CamelService companion object used by standalone applications to create their own
* CamelService instance.
*
* @author Martin Krasser
*/
object CamelService {
/**
* Creates a new CamelService instance.
*/
def newInstance: CamelService = new CamelService {}
}

View file

@ -0,0 +1,135 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel.service
import java.io.InputStream
import java.util.concurrent.CountDownLatch
import org.apache.camel.builder.RouteBuilder
import se.scalablesolutions.akka.actor.{ActorUnregistered, ActorRegistered, Actor}
import se.scalablesolutions.akka.actor.annotation.consume
import se.scalablesolutions.akka.camel.{Consumer, CamelContextManager}
import se.scalablesolutions.akka.util.Logging
/**
* Actor that publishes consumer actors as Camel endpoints at the CamelContext managed
* by se.scalablesolutions.akka.camel.CamelContextManager. It accepts messages of type
* se.scalablesolutions.akka.camel.service.Publish.
*
* @author Martin Krasser
*/
class ConsumerPublisher extends Actor with Logging {
@volatile private var latch = new CountDownLatch(0)
/**
* Adds a route to the actor identified by a Publish message to the global CamelContext.
*/
protected def receive = {
case p: Publish => publish(new ConsumerRoute(p.endpointUri, p.id, p.uuid))
case _ => { /* ignore */}
}
/**
* Sets the number of expected Publish messages received by this actor. Used for testing
* only.
*/
private[camel] def expectPublishCount(count: Int): Unit = latch = new CountDownLatch(count)
/**
* Waits for the number of expected Publish messages to arrive. Used for testing only.
*/
private[camel] def awaitPublish = latch.await
private def publish(route: ConsumerRoute) {
CamelContextManager.context.addRoutes(route)
log.info("published actor via endpoint %s" format route.endpointUri)
latch.countDown // needed for testing only.
}
}
/**
* Defines the route to a consumer actor.
*
* @param endpointUri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> refers to Actor.uuid, <code>false</code> if
* <code>id</code> refers to Acotr.getId.
*
* @author Martin Krasser
*/
class ConsumerRoute(val endpointUri: String, id: String, uuid: Boolean) extends RouteBuilder {
// TODO: make conversions configurable
private val bodyConversions = Map(
"file" -> classOf[InputStream]
)
def configure = {
val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..."
bodyConversions.get(schema) match {
case Some(clazz) => from(endpointUri).convertBodyTo(clazz).to(actorUri)
case None => from(endpointUri).to(actorUri)
}
}
private def actorUri = (if (uuid) "actor:uuid:%s" else "actor:id:%s") format id
}
/**
* A registration listener that publishes consumer actors (and ignores other actors).
*
* @author Martin Krasser
*/
class PublishRequestor(consumerPublisher: Actor) extends Actor {
protected def receive = {
case ActorUnregistered(actor) => { /* ignore */ }
case ActorRegistered(actor) => Publish.forConsumer(actor) match {
case Some(publish) => consumerPublisher ! publish
case None => { /* ignore */ }
}
}
}
/**
* Request message for publishing a consumer actor.
*
* @param endpointUri endpoint URI of the consumer actor
* @param id actor identifier
* @param uuid <code>true</code> if <code>id</code> refers to Actor.uuid, <code>false</code> if
* <code>id</code> refers to Acotr.getId.
*
* @author Martin Krasser
*/
case class Publish(endpointUri: String, id: String, uuid: Boolean)
/**
* @author Martin Krasser
*/
object Publish {
/**
* Creates a list of Publish request messages for all consumer actors in the <code>actors</code>
* list.
*/
def forConsumers(actors: List[Actor]): List[Publish] =
for (actor <- actors; pub = forConsumer(actor); if pub.isDefined) yield pub.get
/**
* Creates a Publish request message if <code>actor</code> is a consumer actor.
*/
def forConsumer(actor: Actor): Option[Publish] =
forConsumeAnnotated(actor) orElse forConsumerType(actor)
private def forConsumeAnnotated(actor: Actor): Option[Publish] = {
val annotation = actor.getClass.getAnnotation(classOf[consume])
if (annotation eq null) None
else if (actor._remoteAddress.isDefined) None // do not publish proxies
else Some(Publish(annotation.value, actor.getId, false))
}
private def forConsumerType(actor: Actor): Option[Publish] =
if (!actor.isInstanceOf[Consumer]) None
else if (actor._remoteAddress.isDefined) None
else Some(Publish(actor.asInstanceOf[Consumer].endpointUri, actor.uuid, true))
}

View file

@ -0,0 +1,79 @@
package se.scalablesolutions.akka.camel
import java.io.InputStream
import org.apache.camel.NoTypeConversionAvailableException
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.Test
class MessageTest extends JUnitSuite {
//
// TODO: extend/rewrite unit tests
// These tests currently only ensure proper functioning of basic features.
//
@Test def shouldConvertDoubleBodyToString = {
CamelContextManager.init
assertEquals("1.4", Message(1.4, null).bodyAs(classOf[String]))
}
@Test def shouldThrowExceptionWhenConvertingDoubleBodyToInputStream {
CamelContextManager.init
intercept[NoTypeConversionAvailableException] {
Message(1.4, null).bodyAs(classOf[InputStream])
}
}
@Test def shouldReturnSubsetOfHeaders = {
val message = Message("test" , Map("A" -> "1", "B" -> "2"))
assertEquals(Map("B" -> "2"), message.headers(Set("B")))
}
@Test def shouldTransformBodyAndPreserveHeaders = {
assertEquals(
Message("ab", Map("A" -> "1")),
Message("a" , Map("A" -> "1")).transformBody[String](body => body + "b"))
}
@Test def shouldConvertBodyAndPreserveHeaders = {
CamelContextManager.init
assertEquals(
Message("1.4", Map("A" -> "1")),
Message(1.4 , Map("A" -> "1")).setBodyAs(classOf[String]))
}
@Test def shouldSetBodyAndPreserveHeaders = {
assertEquals(
Message("test2" , Map("A" -> "1")),
Message("test1" , Map("A" -> "1")).setBody("test2"))
}
@Test def shouldSetHeadersAndPreserveBody = {
assertEquals(
Message("test1" , Map("C" -> "3")),
Message("test1" , Map("A" -> "1")).setHeaders(Map("C" -> "3")))
}
@Test def shouldAddHeaderAndPreserveBodyAndHeaders = {
assertEquals(
Message("test1" , Map("A" -> "1", "B" -> "2")),
Message("test1" , Map("A" -> "1")).addHeader("B" -> "2"))
}
@Test def shouldAddHeadersAndPreserveBodyAndHeaders = {
assertEquals(
Message("test1" , Map("A" -> "1", "B" -> "2")),
Message("test1" , Map("A" -> "1")).addHeaders(Map("B" -> "2")))
}
@Test def shouldRemoveHeadersAndPreserveBodyAndRemainingHeaders = {
assertEquals(
Message("test1" , Map("A" -> "1")),
Message("test1" , Map("A" -> "1", "B" -> "2")).removeHeader("B"))
}
}

View file

@ -0,0 +1,109 @@
package se.scalablesolutions.akka.camel
import org.apache.camel.{Exchange, Processor}
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.component.mock.MockEndpoint
import org.junit.Assert._
import org.junit.{Test, After, Before}
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor
class ProducerTest extends JUnitSuite {
//
// TODO: extend/rewrite unit tests
// These tests currently only ensure proper functioning of basic features.
//
import CamelContextManager._
var mock: MockEndpoint = _
@Before def setUp = {
init
context.addRoutes(new TestRouteBuilder)
start
mock = context.getEndpoint("mock:mock", classOf[MockEndpoint])
}
@After def tearDown = {
stop
}
//
// TODO: test replies to messages sent with ! (bang)
// TODO: test copying of custom message headers
//
@Test def shouldProduceMessageSyncAndReceiveResponse = {
val producer = new TestProducer("direct:input2", false, false).start
val message = Message("test1", Map(Message.MessageExchangeId -> "123"))
val expected = Message("Hello test1", Map(Message.MessageExchangeId -> "123"))
assertEquals(expected, producer !! message get)
producer.stop
}
@Test def shouldProduceMessageSyncAndReceiveFailure = {
val producer = new TestProducer("direct:input2", false, false).start
val message = Message("fail", Map(Message.MessageExchangeId -> "123"))
val result = producer.!![Failure](message).get
assertEquals("failure", result.cause.getMessage)
assertEquals(Map(Message.MessageExchangeId -> "123"), result.headers)
producer.stop
}
@Test def shouldProduceMessageAsyncAndReceiveResponse = {
val producer = new TestProducer("direct:input2", true, false).start
val message = Message("test2", Map(Message.MessageExchangeId -> "124"))
val expected = Message("Hello test2", Map(Message.MessageExchangeId -> "124"))
assertEquals(expected, producer !! message get)
producer.stop
}
@Test def shouldProduceMessageAsyncAndReceiveFailure = {
val producer = new TestProducer("direct:input2", true, false).start
val message = Message("fail", Map(Message.MessageExchangeId -> "124"))
val result = producer.!![Failure](message).get
assertEquals("failure", result.cause.getMessage)
assertEquals(Map(Message.MessageExchangeId -> "124"), result.headers)
producer.stop
}
@Test def shouldProduceMessageSyncWithoutReceivingResponse = {
val producer = new TestProducer("direct:input1", false, true).start
mock.expectedBodiesReceived("test3")
producer.!("test3")(None)
producer.stop
}
@Test def shouldProduceMessageAsyncAndReceiveResponseSync = {
val producer = new TestProducer("direct:input1", true, true).start
mock.expectedBodiesReceived("test4")
producer.!("test4")(None)
producer.stop
}
class TestProducer(uri:String, prodAsync: Boolean, prodOneway: Boolean) extends Actor with Producer {
override def async = prodAsync
override def oneway = prodOneway
def endpointUri = uri
def receive = produce
}
class TestRouteBuilder extends RouteBuilder {
def configure {
from("direct:input1").to("mock:mock")
from("direct:input2").process(new Processor() {
def process(exchange: Exchange) = {
val body = exchange.getIn.getBody
body match {
case "fail" => throw new Exception("failure")
case body => exchange.getOut.setBody("Hello %s" format body)
}
}
})
}
}
}

View file

@ -0,0 +1,49 @@
package se.scalablesolutions.akka.camel.component
import org.junit._
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.camel.{CamelContextLifecycle, Message}
class ActorComponentTest extends JUnitSuite with CamelContextLifecycle {
//
// TODO: extend/rewrite unit tests
// These tests currently only ensure proper functioning of basic features.
//
@Before def setUp = {
init
start
}
@After def tearDown = {
stop
}
@Test def shouldReceiveResponseFromActorReferencedById = {
val actor = new TestActor
actor.start
assertEquals("Hello Martin", template.requestBody("actor:%s" format actor.getId, "Martin"))
assertEquals("Hello Martin", template.requestBody("actor:id:%s" format actor.getId, "Martin"))
actor.stop
}
@Test def shouldReceiveResponseFromActorReferencedByUuid = {
val actor = new TestActor
actor.start
assertEquals("Hello Martin", template.requestBody("actor:uuid:%s" format actor.uuid, "Martin"))
actor.stop
}
class TestActor extends Actor {
protected def receive = {
case msg: Message => reply("Hello %s" format msg.body)
}
}
}

View file

@ -0,0 +1,42 @@
package se.scalablesolutions.akka.camel.component
import org.apache.camel.{CamelContext, ExchangePattern}
import org.apache.camel.impl.{DefaultCamelContext, DefaultExchange}
import org.junit.Assert._
import org.junit.Test
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.camel.Message
class ActorProducerTest extends JUnitSuite {
//
// TODO: extend/rewrite unit tests
// These tests currently only ensure proper functioning of basic features.
//
val context = new DefaultCamelContext
val endpoint = context.getEndpoint("actor:%s" format classOf[TestActor].getName)
val producer = endpoint.createProducer
@Test def shouldSendAndReceiveMessageBodyAndHeaders = {
val exchange = new DefaultExchange(null.asInstanceOf[CamelContext], ExchangePattern.InOut)
val actor = new TestActor
actor.start
exchange.getIn.setBody("Martin")
exchange.getIn.setHeader("k1", "v1")
producer.process(exchange)
assertEquals("Hello Martin", exchange.getOut.getBody)
assertEquals("v1", exchange.getOut.getHeader("k1"))
assertEquals("v2", exchange.getOut.getHeader("k2"))
actor.stop
}
class TestActor extends Actor {
protected def receive = {
case msg: Message => reply(Message("Hello %s" format msg.body, Map("k2" -> "v2") ++ msg.headers))
}
}
}

View file

@ -0,0 +1,103 @@
package se.scalablesolutions.akka.camel.service
import org.apache.camel.builder.RouteBuilder
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.annotation.consume
import se.scalablesolutions.akka.camel.{CamelContextManager, Consumer, Message}
import org.junit.{Ignore, Before, After, Test}
class CamelServiceTest extends JUnitSuite with CamelService {
//
// TODO: extend/rewrite unit tests
// These tests currently only ensure proper functioning of basic features.
//
import CamelContextManager._
var actor1: Actor = _
var actor2: Actor = _
var actor3: Actor = _
@Before def setUp = {
// register actors before starting the CamelService
actor1 = new TestActor1().start
actor2 = new TestActor2().start
actor3 = new TestActor3().start
// initialize global CamelContext
init
// customize global CamelContext
context.addRoutes(new TestRouteBuilder)
consumerPublisher.expectPublishCount(2)
load
consumerPublisher.awaitPublish
}
@After def tearDown = {
unload
actor1.stop
actor2.stop
actor3.stop
}
@Test def shouldReceiveResponseViaPreStartGeneratedRoutes = {
assertEquals("Hello Martin (actor1)", template.requestBody("direct:actor1", "Martin"))
assertEquals("Hello Martin (actor2)", template.requestBody("direct:actor2", "Martin"))
}
@Test def shouldReceiveResponseViaPostStartGeneratedRoute = {
consumerPublisher.expectPublishCount(1)
// register actor after starting CamelService
val actor4 = new TestActor4().start
consumerPublisher.awaitPublish
assertEquals("Hello Martin (actor4)", template.requestBody("direct:actor4", "Martin"))
actor4.stop
}
@Test def shouldReceiveResponseViaCustomRoute = {
assertEquals("Hello Tester (actor3)", template.requestBody("direct:actor3", "Martin"))
}
}
class TestActor1 extends Actor with Consumer {
def endpointUri = "direct:actor1"
protected def receive = {
case msg: Message => reply("Hello %s (actor1)" format msg.body)
}
}
@consume("direct:actor2")
class TestActor2 extends Actor {
protected def receive = {
case msg: Message => reply("Hello %s (actor2)" format msg.body)
}
}
class TestActor3 extends Actor {
id = "actor3"
protected def receive = {
case msg: Message => reply("Hello %s (actor3)" format msg.body)
}
}
class TestActor4 extends Actor with Consumer {
def endpointUri = "direct:actor4"
protected def receive = {
case msg: Message => reply("Hello %s (actor4)" format msg.body)
}
}
class TestRouteBuilder extends RouteBuilder {
def configure {
val actorUri = "actor:%s" format classOf[TestActor3].getName
from("direct:actor3").transform(constant("Tester")).to("actor:actor3")
}
}

View file

@ -19,7 +19,7 @@ import java.net.InetSocketAddress
import java.lang.reflect.{InvocationTargetException, Method}
object Annotations {
import se.scalablesolutions.akka.annotation._
import se.scalablesolutions.akka.actor.annotation._
val oneway = classOf[oneway]
val transactionrequired = classOf[transactionrequired]
val prerestart = classOf[prerestart]

View file

@ -220,9 +220,10 @@ trait Actor extends TransactionManagement {
private[akka] val _mailbox: Deque[MessageInvocation] = new LinkedBlockingDeque[MessageInvocation]
/**
* This lock ensures thread safety in the dispatching: only one message can be dispatched at once on the actor.
* This lock ensures thread safety in the dispatching: only one message can
* be dispatched at once on the actor.
*/
private[akka] val _dispatcherLock:Lock = new ReentrantLock
private[akka] val _dispatcherLock: Lock = new ReentrantLock
// ====================================
// protected fields
@ -506,8 +507,6 @@ trait Actor extends TransactionManagement {
def !![T](message: Any, timeout: Long): Option[T] = {
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (_isRunning) {
val from = if (sender != null && sender.isInstanceOf[Actor]) Some(sender.asInstanceOf[Actor])
else None
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, None)
val isActiveObject = message.isInstanceOf[Invocation]
if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.completeWithResult(None)
@ -878,7 +877,6 @@ trait Actor extends TransactionManagement {
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods.
*/
private[akka] def invoke(messageHandle: MessageInvocation) = synchronized {
//log.trace("%s is invoked with message %s", toString, messageHandle)
try {
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
else dispatch(messageHandle)

View file

@ -8,8 +8,7 @@ import se.scalablesolutions.akka.util.Logging
import scala.collection.mutable.ListBuffer
import scala.reflect.Manifest
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
/**
* Registry holding all Actor instances in the whole system.
@ -23,9 +22,10 @@ import java.util.concurrent.ConcurrentHashMap
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ActorRegistry extends Logging {
private val actorsByUUID = new ConcurrentHashMap[String, Actor]
private val actorsById = new ConcurrentHashMap[String, List[Actor]]
private val actorsByClassName = new ConcurrentHashMap[String, List[Actor]]
private val actorsByUUID = new ConcurrentHashMap[String, Actor]
private val actorsById = new ConcurrentHashMap[String, List[Actor]]
private val actorsByClassName = new ConcurrentHashMap[String, List[Actor]]
private val registrationListeners = new CopyOnWriteArrayList[Actor]
/**
* Returns all actors in the system.
@ -103,6 +103,9 @@ object ActorRegistry extends Logging {
if (actorsByClassName.containsKey(className)) {
actorsByClassName.put(className, actor :: actorsByClassName.get(className))
} else actorsByClassName.put(className, actor :: Nil)
// notify listeners
foreachListener(_.!(ActorRegistered(actor))(None))
}
/**
@ -112,6 +115,8 @@ object ActorRegistry extends Logging {
actorsByUUID remove actor.uuid
actorsById remove actor.getId
actorsByClassName remove actor.getClass.getName
// notify listeners
foreachListener(_.!(ActorUnregistered(actor))(None))
}
/**
@ -125,4 +130,26 @@ object ActorRegistry extends Logging {
actorsByClassName.clear
log.info("All actors have been shut down and unregistered from ActorRegistry")
}
/**
* Adds the registration <code>listener</code> this this registry's listener list.
*/
def addRegistrationListener(listener: Actor) = {
registrationListeners.add(listener)
}
/**
* Removes the registration <code>listener</code> this this registry's listener list.
*/
def removeRegistrationListener(listener: Actor) = {
registrationListeners.remove(listener)
}
private def foreachListener(f: (Actor) => Unit) {
val iterator = registrationListeners.iterator
while (iterator.hasNext) f(iterator.next)
}
}
case class ActorRegistered(actor: Actor)
case class ActorUnregistered(actor: Actor)

View file

@ -1,6 +1,6 @@
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.annotation.oneway;
import se.scalablesolutions.akka.actor.annotation.oneway;
public interface Bar {
@oneway

View file

@ -1,7 +1,7 @@
package se.scalablesolutions.akka.api;
import com.google.inject.Inject;
import se.scalablesolutions.akka.annotation.oneway;
import se.scalablesolutions.akka.actor.annotation.oneway;
public class Foo extends se.scalablesolutions.akka.serialization.Serializable.JavaJSON {
@Inject

View file

@ -1,9 +1,9 @@
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.annotation.transactionrequired;
import se.scalablesolutions.akka.annotation.prerestart;
import se.scalablesolutions.akka.annotation.postrestart;
import se.scalablesolutions.akka.annotation.inittransactionalstate;
import se.scalablesolutions.akka.actor.annotation.transactionrequired;
import se.scalablesolutions.akka.actor.annotation.prerestart;
import se.scalablesolutions.akka.actor.annotation.postrestart;
import se.scalablesolutions.akka.actor.annotation.inittransactionalstate;
import se.scalablesolutions.akka.stm.*;
@transactionrequired

View file

@ -1,7 +1,7 @@
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.annotation.transactionrequired;
import se.scalablesolutions.akka.annotation.inittransactionalstate;
import se.scalablesolutions.akka.actor.annotation.transactionrequired;
import se.scalablesolutions.akka.actor.annotation.inittransactionalstate;
import se.scalablesolutions.akka.stm.*;
@transactionrequired

View file

@ -2,7 +2,7 @@ package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.persistence.common.*;
import se.scalablesolutions.akka.persistence.cassandra.*;
import se.scalablesolutions.akka.annotation.inittransactionalstate;
import se.scalablesolutions.akka.actor.annotation.inittransactionalstate;
public class PersistentClasher {
private PersistentMap state;

View file

@ -1,7 +1,7 @@
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.annotation.inittransactionalstate;
import se.scalablesolutions.akka.annotation.transactionrequired;
import se.scalablesolutions.akka.actor.annotation.inittransactionalstate;
import se.scalablesolutions.akka.actor.annotation.transactionrequired;
import se.scalablesolutions.akka.persistence.common.*;
import se.scalablesolutions.akka.persistence.cassandra.*;

View file

@ -1,7 +1,7 @@
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.annotation.inittransactionalstate;
import se.scalablesolutions.akka.annotation.transactionrequired;
import se.scalablesolutions.akka.actor.annotation.inittransactionalstate;
import se.scalablesolutions.akka.actor.annotation.transactionrequired;
import se.scalablesolutions.akka.persistence.common.*;
import se.scalablesolutions.akka.persistence.cassandra.*;

View file

@ -7,6 +7,7 @@ package se.scalablesolutions.akka.kernel
import se.scalablesolutions.akka.remote.BootableRemoteActorService
import se.scalablesolutions.akka.comet.BootableCometActorService
import se.scalablesolutions.akka.actor.BootableActorLoaderService
import se.scalablesolutions.akka.camel.service.CamelService
import se.scalablesolutions.akka.config.Config
import se.scalablesolutions.akka.util.{Logging, Bootable}
@ -37,7 +38,8 @@ object Kernel extends Logging {
def boot: Unit = boot(true,
new BootableActorLoaderService
with BootableRemoteActorService
with BootableCometActorService)
with BootableCometActorService
with CamelService)
/**
* Boots up the Kernel.

View file

@ -0,0 +1,23 @@
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<!-- ================================================================== -->
<!-- Camel JMS component and ActiveMQ setup -->
<!-- ================================================================== -->
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
<property name="configuration" ref="jmsConfig"/>
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://testbroker"/>
</bean>
</beans>

View file

@ -0,0 +1,92 @@
package sample.camel
import se.scalablesolutions.akka.actor.{Actor, RemoteActor}
import se.scalablesolutions.akka.actor.annotation.consume
import se.scalablesolutions.akka.camel.{Producer, Message, Consumer}
import se.scalablesolutions.akka.util.Logging
/**
* Client-initiated remote actor.
*/
class RemoteActor1 extends RemoteActor("localhost", 7777) with Consumer {
def endpointUri = "jetty:http://localhost:6644/remote1"
protected def receive = {
case msg: Message => reply(Message("hello %s" format msg.body, Map("sender" -> "remote1")))
}
}
/**
* Server-initiated remote actor.
*/
class RemoteActor2 extends Actor with Consumer {
def endpointUri = "jetty:http://localhost:6644/remote2"
protected def receive = {
case msg: Message => reply(Message("hello %s" format msg.body, Map("sender" -> "remote2")))
}
}
class Producer1 extends Actor with Producer {
def endpointUri = "direct:welcome"
override def oneway = false // default
override def async = true // default
protected def receive = produce
}
class Consumer1 extends Actor with Consumer with Logging {
def endpointUri = "file:data/input"
def receive = {
case msg: Message => log.info("received %s" format msg.bodyAs(classOf[String]))
}
}
@consume("jetty:http://0.0.0.0:8877/camel/test1")
class Consumer2 extends Actor {
def receive = {
case msg: Message => reply("Hello %s" format msg.bodyAs(classOf[String]))
}
}
class Consumer3(transformer: Actor) extends Actor with Consumer {
def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome"
def receive = {
case msg: Message => transformer.forward(msg.setBodyAs(classOf[String]))
}
}
class Transformer(producer: Actor) extends Actor {
protected def receive = {
case msg: Message => producer.forward(msg.transformBody[String]("- %s -" format _))
}
}
class Subscriber(name:String, uri: String) extends Actor with Consumer {
def endpointUri = uri
protected def receive = {
case msg: Message => log.info("%s received: %s" format (name, msg.body))
}
}
class Publisher(name: String, uri: String) extends Actor with Producer {
id = name
def endpointUri = uri
override def oneway = true
protected def receive = produce
}
class PublisherBridge(uri: String, publisher: Actor) extends Actor with Consumer {
def endpointUri = uri
protected def receive = {
case msg: Message => {
publisher ! msg.bodyAs(classOf[String])
reply("message published")
}
}
}

View file

@ -0,0 +1,28 @@
package sample.camel
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.camel.Message
import se.scalablesolutions.akka.remote.RemoteClient
/**
* @author Martin Krasser
*/
object Application1 {
//
// TODO: completion of example
//
def main(args: Array[String]) {
implicit val sender: Option[Actor] = None
val actor1 = new RemoteActor1
val actor2 = RemoteClient.actorFor("remote2", "localhost", 7777)
actor1.start
println(actor1 !! Message("actor1"))
println(actor2 !! Message("actor2"))
}
}

View file

@ -0,0 +1,22 @@
package sample.camel
import se.scalablesolutions.akka.camel.service.CamelService
import se.scalablesolutions.akka.remote.RemoteNode
/**
* @author Martin Krasser
*/
object Application2 {
//
// TODO: completion of example
//
def main(args: Array[String]) {
val camelService = CamelService.newInstance
camelService.load
RemoteNode.start("localhost", 7777)
RemoteNode.register("remote2", new RemoteActor2().start)
}
}

View file

@ -0,0 +1,70 @@
package sample.camel
import org.apache.camel.{Exchange, Processor}
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.spring.spi.ApplicationContextRegistry
import org.springframework.context.support.ClassPathXmlApplicationContext
import se.scalablesolutions.akka.actor.SupervisorFactory
import se.scalablesolutions.akka.camel.CamelContextManager
import se.scalablesolutions.akka.config.ScalaConfig._
/**
* @author Martin Krasser
*/
class Boot {
// Create CamelContext with Spring-based registry and custom route builder
val context = new ClassPathXmlApplicationContext("/sample-camel-context.xml", getClass)
val registry = new ApplicationContextRegistry(context)
CamelContextManager.init(new DefaultCamelContext(registry))
CamelContextManager.context.addRoutes(new CustomRouteBuilder)
// Basic example
val factory = SupervisorFactory(
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise(new Consumer1, LifeCycle(Permanent)) ::
Supervise(new Consumer2, LifeCycle(Permanent)) :: Nil))
factory.newInstance.start
// Routing example
val producer = new Producer1
val mediator = new Transformer(producer)
val consumer = new Consumer3(mediator)
producer.start
mediator.start
consumer.start
// Publish subscribe example
val cometdUri = "cometd://localhost:8111/test/abc?resourceBase=target"
val cometdSubscriber = new Subscriber("cometd-subscriber", cometdUri).start
val cometdPublisher = new Publisher("cometd-publisher", cometdUri).start
val jmsUri = "jms:topic:test"
val jmsSubscriber1 = new Subscriber("jms-subscriber-1", jmsUri).start
val jmsSubscriber2 = new Subscriber("jms-subscriber-2", jmsUri).start
val jmsPublisher = new Publisher("jms-publisher", jmsUri).start
val cometdPublisherBridge = new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/cometd", cometdPublisher).start
val jmsPublisherBridge = new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher).start
}
class CustomRouteBuilder extends RouteBuilder {
def configure {
val actorUri = "actor:%s" format classOf[Consumer2].getName
from("jetty:http://0.0.0.0:8877/camel/test2").to(actorUri)
from("direct:welcome").process(new Processor() {
def process(exchange: Exchange) {
exchange.getOut.setBody("Welcome %s" format exchange.getIn.getBody)
}
})
}
}

View file

@ -8,9 +8,9 @@ import javax.ws.rs.Path;
import javax.ws.rs.GET;
import javax.ws.rs.Produces;
import se.scalablesolutions.akka.annotation.transactionrequired;
import se.scalablesolutions.akka.annotation.prerestart;
import se.scalablesolutions.akka.annotation.postrestart;
import se.scalablesolutions.akka.actor.annotation.transactionrequired;
import se.scalablesolutions.akka.actor.annotation.prerestart;
import se.scalablesolutions.akka.actor.annotation.postrestart;
import se.scalablesolutions.akka.persistence.common.PersistentMap;
import se.scalablesolutions.akka.persistence.cassandra.CassandraStorage;

View file

@ -8,9 +8,9 @@ import javax.ws.rs.Path;
import javax.ws.rs.GET;
import javax.ws.rs.Produces;
import se.scalablesolutions.akka.annotation.transactionrequired;
import se.scalablesolutions.akka.annotation.prerestart;
import se.scalablesolutions.akka.annotation.postrestart;
import se.scalablesolutions.akka.actor.annotation.transactionrequired;
import se.scalablesolutions.akka.actor.annotation.prerestart;
import se.scalablesolutions.akka.actor.annotation.postrestart;
import se.scalablesolutions.akka.stm.TransactionalState;
import se.scalablesolutions.akka.stm.TransactionalMap;

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.annotation;
package se.scalablesolutions.akka.actor.annotation;
import java.lang.annotation.*;

View file

@ -0,0 +1,18 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface consume {
public abstract String value();
}

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.annotation;
package se.scalablesolutions.akka.actor.annotation;
import java.lang.annotation.*;

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.annotation;
package se.scalablesolutions.akka.actor.annotation;
import java.lang.annotation.*;

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.annotation;
package se.scalablesolutions.akka.actor.annotation;
import java.lang.annotation.*;

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.annotation;
package se.scalablesolutions.akka.actor.annotation;
import java.lang.annotation.*;

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.annotation;
package se.scalablesolutions.akka.actor.annotation;
import java.lang.annotation.*;

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.annotation;
package se.scalablesolutions.akka.actor.annotation;
import java.lang.annotation.*;

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.annotation;
package se.scalablesolutions.akka.actor.annotation;
import java.lang.annotation.*;

View file

@ -19,8 +19,9 @@
# FQN to the class doing initial active object/actor
# supervisor bootstrap, should be defined in default constructor
boot = ["sample.java.Boot",
"sample.scala.Boot",
boot = ["sample.camel.Boot",
"sample.java.Boot",
"sample.scala.Boot",
"se.scalablesolutions.akka.security.samples.Boot"]
<actor>

View file

@ -79,12 +79,13 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
lazy val akka_amqp = project("akka-amqp", "akka-amqp", new AkkaAMQPProject(_), akka_core)
lazy val akka_rest = project("akka-rest", "akka-rest", new AkkaRestProject(_), akka_core)
lazy val akka_comet = project("akka-comet", "akka-comet", new AkkaCometProject(_), akka_rest)
lazy val akka_camel = project("akka-camel", "akka-camel", new AkkaCamelProject(_), akka_core)
lazy val akka_patterns = project("akka-patterns", "akka-patterns", new AkkaPatternsProject(_), akka_core)
lazy val akka_security = project("akka-security", "akka-security", new AkkaSecurityProject(_), akka_core)
lazy val akka_persistence = project("akka-persistence", "akka-persistence", new AkkaPersistenceParentProject(_))
lazy val akka_cluster = project("akka-cluster", "akka-cluster", new AkkaClusterParentProject(_))
lazy val akka_kernel = project("akka-kernel", "akka-kernel", new AkkaKernelProject(_),
akka_core, akka_rest, akka_persistence, akka_cluster, akka_amqp, akka_security, akka_comet, akka_patterns)
akka_core, akka_rest, akka_persistence, akka_cluster, akka_amqp, akka_security, akka_comet, akka_camel, akka_patterns)
// functional tests in java
lazy val akka_fun_test = project("akka-fun-test-java", "akka-fun-test-java", new AkkaFunTestProject(_), akka_kernel)
@ -112,6 +113,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
" dist/akka-cluster-jgroups_%s-%s.jar".format(defScalaVersion.value, version) +
" dist/akka-rest_%s-%s.jar".format(defScalaVersion.value, version) +
" dist/akka-comet_%s-%s.jar".format(defScalaVersion.value, version) +
" dist/akka-camel_%s-%s.jar".format(defScalaVersion.value, version) +
" dist/akka-security_%s-%s.jar".format(defScalaVersion.value, version) +
" dist/akka-amqp_%s-%s.jar".format(defScalaVersion.value, version) +
" dist/akka-patterns_%s-%s.jar".format(defScalaVersion.value, version) +
@ -127,9 +129,10 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
Credentials(Path.userHome / ".akka_publish_credentials", log)
override def managedStyle = ManagedStyle.Maven
val publishTo = "Scalable Solutions Maven Repository" at "http://scalablesolutions.se/akka/repository/"
val publishTo = "Scalable Solutions Maven Repository" at "~/tmp/akka"
// val publishTo = "Scalable Solutions Maven Repository" at "http://scalablesolutions.se/akka/repository/"
val sourceArtifact = Artifact(artifactID, "src", "jar", Some("sources"), Nil, None)
val docsArtifact = Artifact(artifactID, "docs", "jar", Some("javadoc"), Nil, None)
// val docsArtifact = Artifact(artifactID, "docs", "jar", Some("javadoc"), Nil, None)
override def packageDocsJar = defaultJarPath("-javadoc.jar")
override def packageSrcJar= defaultJarPath("-sources.jar")
@ -207,6 +210,11 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
lazy val dist = deployTask(info, distPath) dependsOn(`package`) describedAs("Deploying")
}
class AkkaCamelProject(info: ProjectInfo) extends DefaultProject(info) {
val camel_core = "org.apache.camel" % "camel-core" % "2.2.0" % "compile"
lazy val dist = deployTask(info, distPath) dependsOn(`package`) describedAs("Deploying")
}
class AkkaPatternsProject(info: ProjectInfo) extends DefaultProject(info) {
// testing
val scalatest = "org.scalatest" % "scalatest" % "1.0" % "test"
@ -305,7 +313,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val lift_util = "net.liftweb" % "lift-util" % "1.1-M6" % "compile"
val servlet = "javax.servlet" % "servlet-api" % "2.5" % "compile"
// testing
val jetty = "org.mortbay.jetty" % "jetty" % "6.1.6" % "test"
val jetty = "org.mortbay.jetty" % "jetty" % "6.1.11" % "test"
val junit = "junit" % "junit" % "4.5" % "test"
lazy val dist = deployTask(info, deployPath) dependsOn(`package`) describedAs("Deploying")
}
@ -319,6 +327,17 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
lazy val dist = deployTask(info, deployPath) dependsOn(`package`) describedAs("Deploying")
}
class AkkaSampleCamelProject(info: ProjectInfo) extends DefaultProject(info) {
val jetty = "org.mortbay.jetty" % "jetty" % "6.1.11" % "compile"
val jetty_client = "org.mortbay.jetty" % "jetty-client" % "6.1.11" % "compile"
val camel_http = "org.apache.camel" % "camel-http" % "2.2.0" % "compile"
val camel_jetty = "org.apache.camel" % "camel-jetty" % "2.2.0" % "compile" intransitive()
val camel_jms = "org.apache.camel" % "camel-jms" % "2.2.0" % "compile"
val camel_cometd = "org.apache.camel" % "camel-cometd" % "2.2.0" % "compile"
val activemq_core = "org.apache.activemq" % "activemq-core" % "5.3.0" % "compile"
lazy val dist = deployTask(info, deployPath) dependsOn(`package`) describedAs("Deploying")
}
class AkkaSampleSecurityProject(info: ProjectInfo) extends DefaultProject(info) {
val jsr311 = "javax.ws.rs" % "jsr311-api" % "1.1" % "compile"
val jsr250 = "javax.annotation" % "jsr250-api" % "1.0"
@ -330,6 +349,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
lazy val akka_sample_lift = project("akka-sample-lift", "akka-sample-lift", new AkkaSampleLiftProject(_), akka_kernel)
lazy val akka_sample_rest_java = project("akka-sample-rest-java", "akka-sample-rest-java", new AkkaSampleRestJavaProject(_), akka_kernel)
lazy val akka_sample_rest_scala = project("akka-sample-rest-scala", "akka-sample-rest-scala", new AkkaSampleRestScalaProject(_), akka_kernel)
lazy val akka_sample_camel = project("akka-sample-camel", "akka-sample-camel", new AkkaSampleCamelProject(_), akka_kernel)
lazy val akka_sample_security = project("akka-sample-security", "akka-sample-security", new AkkaSampleSecurityProject(_), akka_kernel)
}

16
scripts/run_akka.sh Executable file
View file

@ -0,0 +1,16 @@
#!/bin/bash
cd $AKKA_HOME
VERSION=akka_2.7.7-0.7-SNAPSHOT
TARGET_DIR=dist/$1
shift 1
VMARGS=$@
if [ -d $TARGET_DIR ]; then
cd $TARGET_DIR
else
unzip dist/${VERSION}.zip -d $TARGET_DIR
cd $TARGET_DIR
fi
export AKKA_HOME=`pwd`
java -jar ${VMARGS} ${VERSION}.jar