Merge branch 'master' of git@github.com:jboner/akka
This commit is contained in:
commit
aa82abd0bc
13 changed files with 337 additions and 102 deletions
|
|
@ -29,7 +29,8 @@ import java.lang.reflect.Field
|
|||
import scala.reflect.BeanProperty
|
||||
import scala.collection.immutable.Stack
|
||||
|
||||
object ActorRefStatus {
|
||||
private[akka] object ActorRefInternals {
|
||||
|
||||
/** LifeCycles for ActorRefs
|
||||
*/
|
||||
private[akka] sealed trait StatusType
|
||||
|
|
@ -37,8 +38,13 @@ object ActorRefStatus {
|
|||
object RUNNING extends StatusType
|
||||
object BEING_RESTARTED extends StatusType
|
||||
object SHUTDOWN extends StatusType
|
||||
|
||||
case class TransactorConfig(factory: Option[TransactionFactory] = None, config: TransactionConfig = DefaultGlobalTransactionConfig)
|
||||
val DefaultTransactorConfig = TransactorConfig()
|
||||
val NoTransactionConfig = TransactorConfig()
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* ActorRef is an immutable and serializable handle to an Actor.
|
||||
* <p/>
|
||||
|
|
@ -77,7 +83,7 @@ trait ActorRef extends ActorRefShared with TransactionManagement with Logging wi
|
|||
@volatile
|
||||
protected[akka] var _uuid = newUuid
|
||||
@volatile
|
||||
protected[this] var _status: ActorRefStatus.StatusType = ActorRefStatus.UNSTARTED
|
||||
protected[this] var _status: ActorRefInternals.StatusType = ActorRefInternals.UNSTARTED
|
||||
@volatile
|
||||
protected[akka] var _homeAddress = new InetSocketAddress(RemoteServerModule.HOSTNAME, RemoteServerModule.PORT)
|
||||
@volatile
|
||||
|
|
@ -172,22 +178,18 @@ trait ActorRef extends ActorRefShared with TransactionManagement with Logging wi
|
|||
* start if there is no one running, else it joins the existing transaction.
|
||||
*/
|
||||
@volatile
|
||||
protected[akka] var isTransactor = false
|
||||
protected[akka] var transactorConfig = ActorRefInternals.NoTransactionConfig
|
||||
|
||||
/**
|
||||
* Configuration for TransactionFactory. User overridable.
|
||||
* Returns true if this Actor is a Transactor
|
||||
*/
|
||||
@volatile
|
||||
protected[akka] var _transactionConfig: TransactionConfig = DefaultGlobalTransactionConfig
|
||||
def isTransactor: Boolean = {
|
||||
val c = transactorConfig
|
||||
(c ne ActorRefInternals.NoTransactionConfig) && (c ne null) //Could possibly be null if called before var init
|
||||
}
|
||||
|
||||
/**
|
||||
* TransactionFactory to be used for atomic when isTransactor. Configuration is overridable.
|
||||
*/
|
||||
@volatile
|
||||
private[akka] var _transactionFactory: Option[TransactionFactory] = None
|
||||
|
||||
/**
|
||||
* This is a reference to the message currently being processed by the actor
|
||||
* This is a reference to the message currently being processed by the actor
|
||||
*/
|
||||
@volatile
|
||||
protected[akka] var currentMessage: MessageInvocation = null
|
||||
|
|
@ -220,25 +222,25 @@ trait ActorRef extends ActorRefShared with TransactionManagement with Logging wi
|
|||
/**
|
||||
* Is the actor being restarted?
|
||||
*/
|
||||
def isBeingRestarted: Boolean = _status == ActorRefStatus.BEING_RESTARTED
|
||||
def isBeingRestarted: Boolean = _status == ActorRefInternals.BEING_RESTARTED
|
||||
|
||||
/**
|
||||
* Is the actor running?
|
||||
*/
|
||||
def isRunning: Boolean = _status match {
|
||||
case ActorRefStatus.BEING_RESTARTED | ActorRefStatus.RUNNING => true
|
||||
case ActorRefInternals.BEING_RESTARTED | ActorRefInternals.RUNNING => true
|
||||
case _ => false
|
||||
}
|
||||
|
||||
/**
|
||||
* Is the actor shut down?
|
||||
*/
|
||||
def isShutdown: Boolean = _status == ActorRefStatus.SHUTDOWN
|
||||
def isShutdown: Boolean = _status == ActorRefInternals.SHUTDOWN
|
||||
|
||||
/**
|
||||
* Is the actor ever started?
|
||||
*/
|
||||
def isUnstarted: Boolean = _status == ActorRefStatus.UNSTARTED
|
||||
def isUnstarted: Boolean = _status == ActorRefInternals.UNSTARTED
|
||||
|
||||
/**
|
||||
* Is the actor able to handle the message passed in as arguments?
|
||||
|
|
@ -677,7 +679,7 @@ class LocalActorRef private[akka] (
|
|||
_uuid = __uuid
|
||||
id = __id
|
||||
homeAddress = (__hostname, __port)
|
||||
isTransactor = __isTransactor
|
||||
transactorConfig = if (__isTransactor) ActorRefInternals.DefaultTransactorConfig else ActorRefInternals.NoTransactionConfig
|
||||
timeout = __timeout
|
||||
receiveTimeout = __receiveTimeout
|
||||
lifeCycle = __lifeCycle
|
||||
|
|
@ -745,7 +747,10 @@ class LocalActorRef private[akka] (
|
|||
* However, it will always participate in an existing transaction.
|
||||
*/
|
||||
def makeTransactionRequired() = guard.withGuard {
|
||||
if (!isRunning || isBeingRestarted) isTransactor = true
|
||||
if (!isRunning || isBeingRestarted) {
|
||||
if (transactorConfig eq ActorRefInternals.NoTransactionConfig)
|
||||
transactorConfig = ActorRefInternals.DefaultTransactorConfig
|
||||
}
|
||||
else throw new ActorInitializationException(
|
||||
"Can not make actor transaction required after it has been started")
|
||||
}
|
||||
|
|
@ -753,8 +758,8 @@ class LocalActorRef private[akka] (
|
|||
/**
|
||||
* Sets the transaction configuration for this actor. Needs to be invoked before the actor is started.
|
||||
*/
|
||||
def transactionConfig_=(config: TransactionConfig) = guard.withGuard {
|
||||
if (!isRunning || isBeingRestarted) _transactionConfig = config
|
||||
def transactionConfig_=(configuration: TransactionConfig) = guard.withGuard {
|
||||
if (!isRunning || isBeingRestarted) transactorConfig = transactorConfig.copy(config = configuration)
|
||||
else throw new ActorInitializationException(
|
||||
"Cannot set transaction configuration for actor after it has been started")
|
||||
}
|
||||
|
|
@ -762,7 +767,7 @@ class LocalActorRef private[akka] (
|
|||
/**
|
||||
* Get the transaction configuration for this actor.
|
||||
*/
|
||||
def transactionConfig: TransactionConfig = _transactionConfig
|
||||
def transactionConfig: TransactionConfig = transactorConfig.config
|
||||
|
||||
/**
|
||||
* Set the contact address for this actor. This is used for replying to messages
|
||||
|
|
@ -785,10 +790,10 @@ class LocalActorRef private[akka] (
|
|||
if (!isRunning) {
|
||||
dispatcher.register(this)
|
||||
dispatcher.start
|
||||
if (isTransactor) {
|
||||
_transactionFactory = Some(TransactionFactory(_transactionConfig, id))
|
||||
}
|
||||
_status = ActorRefStatus.RUNNING
|
||||
if (isTransactor)
|
||||
transactorConfig = transactorConfig.copy(factory = Some(TransactionFactory(transactorConfig.config, id)))
|
||||
|
||||
_status = ActorRefInternals.RUNNING
|
||||
|
||||
//If we are not currently creating this ActorRef instance
|
||||
if ((actorInstance ne null) && (actorInstance.get ne null))
|
||||
|
|
@ -807,8 +812,8 @@ class LocalActorRef private[akka] (
|
|||
receiveTimeout = None
|
||||
cancelReceiveTimeout
|
||||
dispatcher.unregister(this)
|
||||
_transactionFactory = None
|
||||
_status = ActorRefStatus.SHUTDOWN
|
||||
transactorConfig = transactorConfig.copy(factory = None)
|
||||
_status = ActorRefInternals.SHUTDOWN
|
||||
actor.postStop
|
||||
ActorRegistry.unregister(this)
|
||||
if (isRemotingEnabled) {
|
||||
|
|
@ -1066,7 +1071,7 @@ class LocalActorRef private[akka] (
|
|||
|
||||
stop
|
||||
} else {
|
||||
_status = ActorRefStatus.BEING_RESTARTED
|
||||
_status = ActorRefInternals.BEING_RESTARTED
|
||||
val failedActor = actorInstance.get
|
||||
guard.withGuard {
|
||||
lifeCycle match {
|
||||
|
|
@ -1081,7 +1086,7 @@ class LocalActorRef private[akka] (
|
|||
if (isProxyableDispatcher(failedActor)) restartProxyableDispatcher(failedActor, reason)
|
||||
else restartActor(failedActor, reason)
|
||||
|
||||
_status = ActorRefStatus.RUNNING
|
||||
_status = ActorRefInternals.RUNNING
|
||||
|
||||
// update restart parameters
|
||||
if (maxNrOfRetries.isDefined && maxNrOfRetriesCount % maxNrOfRetries.get == 0 && maxNrOfRetriesCount != 0)
|
||||
|
|
@ -1179,12 +1184,13 @@ class LocalActorRef private[akka] (
|
|||
private def dispatch[T](messageHandle: MessageInvocation) = {
|
||||
Actor.log.trace("Invoking actor with message: %s\n", messageHandle)
|
||||
val message = messageHandle.message //serializeMessage(messageHandle.message)
|
||||
val isXactor = isTransactor
|
||||
var topLevelTransaction = false
|
||||
val txSet: Option[CountDownCommitBarrier] =
|
||||
if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet
|
||||
else {
|
||||
topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx
|
||||
if (isTransactor) {
|
||||
if (isXactor) {
|
||||
Actor.log.trace("Creating a new transaction set (top-level transaction)\n\tfor actor " + toString +
|
||||
"\n\twith message " + messageHandle)
|
||||
Some(createNewTransactionSet)
|
||||
|
|
@ -1194,8 +1200,8 @@ class LocalActorRef private[akka] (
|
|||
|
||||
try {
|
||||
cancelReceiveTimeout // FIXME: leave this here?
|
||||
if (isTransactor) {
|
||||
val txFactory = _transactionFactory.getOrElse(DefaultGlobalTransactionFactory)
|
||||
if (isXactor) {
|
||||
val txFactory = transactorConfig.factory.getOrElse(DefaultGlobalTransactionFactory)
|
||||
atomic(txFactory) {
|
||||
actor(message)
|
||||
setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
|
||||
|
|
@ -1236,7 +1242,7 @@ class LocalActorRef private[akka] (
|
|||
private def handleExceptionInDispatch(reason: Throwable, message: Any, topLevelTransaction: Boolean) = {
|
||||
Actor.log.error(reason, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
|
||||
|
||||
_status = ActorRefStatus.BEING_RESTARTED
|
||||
_status = ActorRefInternals.BEING_RESTARTED
|
||||
// abort transaction set
|
||||
if (isTransactionSetInScope) {
|
||||
val txSet = getTransactionSetInScope
|
||||
|
|
@ -1373,13 +1379,13 @@ private[akka] case class RemoteActorRef private[akka] (
|
|||
}
|
||||
|
||||
def start: ActorRef = synchronized {
|
||||
_status = ActorRefStatus.RUNNING
|
||||
_status = ActorRefInternals.RUNNING
|
||||
this
|
||||
}
|
||||
|
||||
def stop: Unit = synchronized {
|
||||
if (_status == ActorRefStatus.RUNNING) {
|
||||
_status = ActorRefStatus.SHUTDOWN
|
||||
if (_status == ActorRefInternals.RUNNING) {
|
||||
_status = ActorRefInternals.SHUTDOWN
|
||||
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -159,7 +159,7 @@ object CamelServiceManager {
|
|||
*/
|
||||
def mandatoryService =
|
||||
if (_current.isDefined) _current.get
|
||||
else throw new IllegalStateException("co current Camel service")
|
||||
else throw new IllegalStateException("co current CamelService")
|
||||
|
||||
/**
|
||||
* Returns <code>Some(CamelService)</code> (containing the current CamelService)
|
||||
|
|
|
|||
|
|
@ -1,30 +1,35 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.camel
|
||||
|
||||
import java.util.{Map => JMap, Set => JSet}
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
import org.apache.camel.{Exchange, Message => CamelMessage}
|
||||
import org.apache.camel.util.ExchangeHelper
|
||||
|
||||
import se.scalablesolutions.akka.japi.{Function => JFunction}
|
||||
|
||||
/**
|
||||
* An immutable representation of a Camel message. Actor classes that mix in
|
||||
* se.scalablesolutions.akka.camel.Producer or
|
||||
* se.scalablesolutions.akka.camel.Consumer usually use this message type for communication.
|
||||
* An immutable representation of a Camel message.
|
||||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
case class Message(val body: Any, val headers: Map[String, Any] = Map.empty) {
|
||||
|
||||
/**
|
||||
* Returns the body of the message converted to the type given by the <code>clazz</code>
|
||||
* argument. Conversion is done using Camel's type converter. The type converter is obtained
|
||||
* from the CamelContext managed by CamelContextManager. Applications have to ensure proper
|
||||
* initialization of CamelContextManager.
|
||||
*
|
||||
* @see CamelContextManager.
|
||||
* Creates a Message with given body and empty headers map.
|
||||
*/
|
||||
def bodyAs[T](clazz: Class[T]): T =
|
||||
CamelContextManager.mandatoryContext.getTypeConverter.mandatoryConvertTo[T](clazz, body)
|
||||
def this(body: Any) = this(body, Map.empty[String, Any])
|
||||
|
||||
/**
|
||||
* Creates a Message with given body and headers map. A copy of the headers map is made.
|
||||
* <p>
|
||||
* Java API
|
||||
*/
|
||||
def this(body: Any, headers: JMap[String, Any]) = this(body, headers.toMap)
|
||||
|
||||
/**
|
||||
* Returns the body of the message converted to the type <code>T</code>. Conversion is done
|
||||
|
|
@ -34,76 +39,143 @@ case class Message(val body: Any, val headers: Map[String, Any] = Map.empty) {
|
|||
*
|
||||
* @see CamelContextManager.
|
||||
*/
|
||||
def bodyAs[T](implicit m: Manifest[T]): T =
|
||||
CamelContextManager.mandatoryContext.getTypeConverter.mandatoryConvertTo[T](m.erasure.asInstanceOf[Class[T]], body)
|
||||
def bodyAs[T](implicit m: Manifest[T]): T = getBodyAs(m.erasure.asInstanceOf[Class[T]])
|
||||
|
||||
/**
|
||||
* Returns the body of the message converted to the type as given by the <code>clazz</code>
|
||||
* parameter. Conversion is done using Camel's type converter. The type converter is obtained
|
||||
* from the CamelContext managed by CamelContextManager. Applications have to ensure proper
|
||||
* initialization of CamelContextManager.
|
||||
* <p>
|
||||
* Java API
|
||||
*
|
||||
* @see CamelContextManager.
|
||||
*/
|
||||
def getBodyAs[T](clazz: Class[T]): T =
|
||||
CamelContextManager.mandatoryContext.getTypeConverter.mandatoryConvertTo[T](clazz, body)
|
||||
|
||||
/**
|
||||
* Returns those headers from this message whose name is contained in <code>names</code>.
|
||||
*/
|
||||
def headers(names: Set[String]): Map[String, Any] = headers.filter(names contains _._1)
|
||||
|
||||
/**
|
||||
* Returns those headers from this message whose name is contained in <code>names</code>.
|
||||
* The returned headers map is backed up by an immutable headers map. Any attempt to modify
|
||||
* the returned map will throw an exception.
|
||||
* <p>
|
||||
* Java API
|
||||
*/
|
||||
def getHeaders(names: JSet[String]): JMap[String, Any] = headers.filter(names contains _._1)
|
||||
|
||||
/**
|
||||
* Returns all headers from this message. The returned headers map is backed up by this
|
||||
* message's immutable headers map. Any attempt to modify the returned map will throw an
|
||||
* exception.
|
||||
* <p>
|
||||
* Java API
|
||||
*/
|
||||
def getHeaders: JMap[String, Any] = headers
|
||||
|
||||
/**
|
||||
* Returns the header with given <code>name</code>. Throws <code>NoSuchElementException</code>
|
||||
* if the header doesn't exist.
|
||||
*/
|
||||
def header(name: String): Any = headers(name)
|
||||
|
||||
/**
|
||||
* Returns the header with given <code>name</code>. Throws <code>NoSuchElementException</code>
|
||||
* if the header doesn't exist.
|
||||
* <p>
|
||||
* Java API
|
||||
*/
|
||||
def getHeader(name: String): Any = header(name)
|
||||
|
||||
/**
|
||||
* Returns the header with given <code>name</code> converted to type <code>T</code>. Throws
|
||||
* <code>NoSuchElementException</code> if the header doesn't exist.
|
||||
*/
|
||||
def headerAs[T](name: String)(implicit m: Manifest[T]): T =
|
||||
CamelContextManager.mandatoryContext.getTypeConverter.mandatoryConvertTo[T](m.erasure.asInstanceOf[Class[T]], header(name))
|
||||
getHeaderAs(name, m.erasure.asInstanceOf[Class[T]])
|
||||
|
||||
/**
|
||||
* Returns the header with given <code>name</code> converted to type given by the <code>clazz</code>
|
||||
* argument. Throws <code>NoSuchElementException</code> if the header doesn't exist.
|
||||
* Returns the header with given <code>name</code> converted to type as given by the <code>clazz</code>
|
||||
* parameter. Throws <code>NoSuchElementException</code> if the header doesn't exist.
|
||||
* <p>
|
||||
* Java API
|
||||
*/
|
||||
def headerAs[T](name: String, clazz: Class[T]): T =
|
||||
def getHeaderAs[T](name: String, clazz: Class[T]): T =
|
||||
CamelContextManager.mandatoryContext.getTypeConverter.mandatoryConvertTo[T](clazz, header(name))
|
||||
|
||||
/**
|
||||
* Creates a Message with a new <code>body</code> using a <code>transformer</code> function.
|
||||
* Creates a Message with a transformed body using a <code>transformer</code> function.
|
||||
*/
|
||||
def transformBody[A](transformer: A => Any): Message = setBody(transformer(body.asInstanceOf[A]))
|
||||
|
||||
/**
|
||||
* Creates a Message with a new <code>body</code> converted to type <code>clazz</code>.
|
||||
*
|
||||
* @see Message#bodyAs(Class)
|
||||
* Creates a Message with a transformed body using a <code>transformer</code> function.
|
||||
* <p>
|
||||
* Java API
|
||||
*/
|
||||
@deprecated("use setBodyAs[T](implicit m: Manifest[T]): Message instead")
|
||||
def setBodyAs[T](clazz: Class[T]): Message = setBody(bodyAs(clazz))
|
||||
def transformBody[A](transformer: JFunction[A, Any]): Message = setBody(transformer(body.asInstanceOf[A]))
|
||||
|
||||
/**
|
||||
* Creates a Message with a new <code>body</code> converted to type <code>T</code>.
|
||||
*
|
||||
* @see Message#bodyAs(Class)
|
||||
* Creates a Message with current <code>body</code> converted to type <code>T</code>.
|
||||
*/
|
||||
def setBodyAs[T](implicit m: Manifest[T]): Message = setBody(bodyAs[T])
|
||||
def setBodyAs[T](implicit m: Manifest[T]): Message = setBodyAs(m.erasure.asInstanceOf[Class[T]])
|
||||
|
||||
/**
|
||||
* Creates a Message with a new <code>body</code>.
|
||||
* Creates a Message with current <code>body</code> converted to type <code>clazz</code>.
|
||||
* <p>
|
||||
* Java API
|
||||
*/
|
||||
def setBodyAs[T](clazz: Class[T]): Message = setBody(getBodyAs(clazz))
|
||||
|
||||
/**
|
||||
* Creates a Message with a given <code>body</code>.
|
||||
*/
|
||||
def setBody(body: Any) = new Message(body, this.headers)
|
||||
|
||||
/**
|
||||
* Creates a new Message with new <code>headers</code>.
|
||||
* Creates a new Message with given <code>headers</code>.
|
||||
*/
|
||||
def setHeaders(headers: Map[String, Any]) = copy(this.body, headers)
|
||||
def setHeaders(headers: Map[String, Any]): Message = copy(this.body, headers)
|
||||
|
||||
/**
|
||||
* Creates a new Message with the <code>headers</code> argument added to the existing headers.
|
||||
* Creates a new Message with given <code>headers</code>. A copy of the headers map is made.
|
||||
* <p>
|
||||
* Java API
|
||||
*/
|
||||
def addHeaders(headers: Map[String, Any]) = copy(this.body, this.headers ++ headers)
|
||||
def setHeaders(headers: JMap[String, Any]): Message = setHeaders(headers.toMap)
|
||||
|
||||
/**
|
||||
* Creates a new Message with the <code>header</code> argument added to the existing headers.
|
||||
* Creates a new Message with given <code>headers</code> added to the current headers.
|
||||
*/
|
||||
def addHeader(header: (String, Any)) = copy(this.body, this.headers + header)
|
||||
def addHeaders(headers: Map[String, Any]): Message = copy(this.body, this.headers ++ headers)
|
||||
|
||||
/**
|
||||
* Creates a new Message where the header with name <code>headerName</code> is removed from
|
||||
* Creates a new Message with given <code>headers</code> added to the current headers.
|
||||
* A copy of the headers map is made.
|
||||
* <p>
|
||||
* Java API
|
||||
*/
|
||||
def addHeaders(headers: JMap[String, Any]): Message = addHeaders(headers.toMap)
|
||||
|
||||
/**
|
||||
* Creates a new Message with the given <code>header</code> added to the current headers.
|
||||
*/
|
||||
def addHeader(header: (String, Any)): Message = copy(this.body, this.headers + header)
|
||||
|
||||
/**
|
||||
* Creates a new Message with the given header, represented by <code>name</code> and
|
||||
* <code>value</code> added to the existing headers.
|
||||
* <p>
|
||||
* Java API
|
||||
*/
|
||||
def addHeader(name: String, value: Any): Message = addHeader((name, value))
|
||||
|
||||
/**
|
||||
* Creates a new Message where the header with given <code>headerName</code> is removed from
|
||||
* the existing headers.
|
||||
*/
|
||||
def removeHeader(headerName: String) = copy(this.body, this.headers - headerName)
|
||||
|
|
@ -127,7 +199,7 @@ object Message {
|
|||
/**
|
||||
* Creates a new Message with <code>body</code> as message body and an empty header map.
|
||||
*/
|
||||
def apply(body: Any) = new Message(body)
|
||||
//def apply(body: Any) = new Message(body)
|
||||
|
||||
/**
|
||||
* Creates a canonical form of the given message <code>msg</code>. If <code>msg</code> of type
|
||||
|
|
@ -147,15 +219,43 @@ object Message {
|
|||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
case class Failure(val cause: Exception, val headers: Map[String, Any] = Map.empty)
|
||||
case class Failure(val cause: Exception, val headers: Map[String, Any] = Map.empty) {
|
||||
|
||||
/**
|
||||
* Creates a Failure with cause body and empty headers map.
|
||||
*/
|
||||
def this(cause: Exception) = this(cause, Map.empty[String, Any])
|
||||
|
||||
/**
|
||||
* Creates a Failure with given cause and headers map. A copy of the headers map is made.
|
||||
* <p>
|
||||
* Java API
|
||||
*/
|
||||
def this(cause: Exception, headers: JMap[String, Any]) = this(cause, headers.toMap)
|
||||
|
||||
/**
|
||||
* Returns the cause of this Failure.
|
||||
* <p>
|
||||
* Java API.
|
||||
*/
|
||||
def getCause = cause
|
||||
|
||||
/**
|
||||
* Returns all headers from this failure message. The returned headers map is backed up by
|
||||
* this message's immutable headers map. Any attempt to modify the returned map will throw
|
||||
* an exception.
|
||||
* <p>
|
||||
* Java API
|
||||
*/
|
||||
def getHeaders: JMap[String, Any] = headers
|
||||
}
|
||||
|
||||
/**
|
||||
* Adapter for converting an org.apache.camel.Exchange to and from Message and Failure objects.
|
||||
* Adapter for converting an org.apache.camel.Exchange to and from Message and Failure objects.
|
||||
*
|
||||
* @author Martin Krasser
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
class CamelExchangeAdapter(exchange: Exchange) {
|
||||
|
||||
import CamelMessageConversion.toMessageAdapter
|
||||
|
||||
/**
|
||||
|
|
@ -256,10 +356,7 @@ class CamelMessageAdapter(val cm: CamelMessage) {
|
|||
*/
|
||||
def toMessage(headers: Map[String, Any]): Message = Message(cm.getBody, cmHeaders(headers, cm))
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
private def cmHeaders(headers: Map[String, Any], cm: CamelMessage) =
|
||||
headers ++ cm.getHeaders
|
||||
private def cmHeaders(headers: Map[String, Any], cm: CamelMessage) = headers ++ cm.getHeaders
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -206,12 +206,12 @@ private[akka] object AsyncCallbackAdapter {
|
|||
private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCallback) extends ActorRef with ScalaActorRef {
|
||||
|
||||
def start = {
|
||||
_status = ActorRefStatus.RUNNING
|
||||
_status = ActorRefInternals.RUNNING
|
||||
this
|
||||
}
|
||||
|
||||
def stop() = {
|
||||
_status = ActorRefStatus.SHUTDOWN
|
||||
_status = ActorRefInternals.SHUTDOWN
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -0,0 +1,129 @@
|
|||
package se.scalablesolutions.akka.camel;
|
||||
|
||||
import org.apache.camel.NoTypeConversionAvailableException;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import se.scalablesolutions.akka.camel.CamelContextManager;
|
||||
import se.scalablesolutions.akka.camel.Message;
|
||||
import se.scalablesolutions.akka.japi.Function;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.util.*;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
public class MessageJavaTestBase {
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() {
|
||||
CamelContextManager.init();
|
||||
}
|
||||
|
||||
@Test public void shouldConvertDoubleBodyToString() {
|
||||
assertEquals("1.4", new Message("1.4").getBodyAs(String.class));
|
||||
}
|
||||
|
||||
@Test(expected=NoTypeConversionAvailableException.class)
|
||||
public void shouldThrowExceptionWhenConvertingDoubleBodyToInputStream() {
|
||||
new Message(1.4).getBodyAs(InputStream.class);
|
||||
}
|
||||
|
||||
@Test public void shouldReturnDoubleHeader() {
|
||||
Message message = new Message("test" , createMap("test", 1.4));
|
||||
assertEquals(1.4, message.getHeader("test"));
|
||||
}
|
||||
|
||||
@Test public void shouldConvertDoubleHeaderToString() {
|
||||
Message message = new Message("test" , createMap("test", 1.4));
|
||||
assertEquals("1.4", message.getHeaderAs("test", String.class));
|
||||
}
|
||||
|
||||
@Test public void shouldReturnSubsetOfHeaders() {
|
||||
Message message = new Message("test" , createMap("A", "1", "B", "2"));
|
||||
assertEquals(createMap("B", "2"), message.getHeaders(createSet("B")));
|
||||
}
|
||||
|
||||
@Test(expected=UnsupportedOperationException.class)
|
||||
public void shouldReturnSubsetOfHeadersUnmodifiable() {
|
||||
Message message = new Message("test" , createMap("A", "1", "B", "2"));
|
||||
message.getHeaders(createSet("B")).put("x", "y");
|
||||
}
|
||||
|
||||
@Test public void shouldReturnAllHeaders() {
|
||||
Message message = new Message("test" , createMap("A", "1", "B", "2"));
|
||||
assertEquals(createMap("A", "1", "B", "2"), message.getHeaders());
|
||||
}
|
||||
|
||||
@Test(expected=UnsupportedOperationException.class)
|
||||
public void shouldReturnAllHeadersUnmodifiable() {
|
||||
Message message = new Message("test" , createMap("A", "1", "B", "2"));
|
||||
message.getHeaders().put("x", "y");
|
||||
}
|
||||
|
||||
@Test public void shouldTransformBodyAndPreserveHeaders() {
|
||||
assertEquals(
|
||||
new Message("ab", createMap("A", "1")),
|
||||
new Message("a" , createMap("A", "1")).transformBody((Function)new TestTransformer()));
|
||||
}
|
||||
|
||||
@Test public void shouldConvertBodyAndPreserveHeaders() {
|
||||
assertEquals(
|
||||
new Message("1.4", createMap("A", "1")),
|
||||
new Message(1.4 , createMap("A", "1")).setBodyAs(String.class));
|
||||
}
|
||||
|
||||
@Test public void shouldSetBodyAndPreserveHeaders() {
|
||||
assertEquals(
|
||||
new Message("test2" , createMap("A", "1")),
|
||||
new Message("test1" , createMap("A", "1")).setBody("test2"));
|
||||
}
|
||||
|
||||
@Test public void shouldSetHeadersAndPreserveBody() {
|
||||
assertEquals(
|
||||
new Message("test1" , createMap("C", "3")),
|
||||
new Message("test1" , createMap("A", "1")).setHeaders(createMap("C", "3")));
|
||||
}
|
||||
|
||||
@Test public void shouldAddHeaderAndPreserveBodyAndHeaders() {
|
||||
assertEquals(
|
||||
new Message("test1" , createMap("A", "1", "B", "2")),
|
||||
new Message("test1" , createMap("A", "1")).addHeader("B", "2"));
|
||||
}
|
||||
|
||||
@Test public void shouldAddHeadersAndPreserveBodyAndHeaders() {
|
||||
assertEquals(
|
||||
new Message("test1" , createMap("A", "1", "B", "2")),
|
||||
new Message("test1" , createMap("A", "1")).addHeaders(createMap("B", "2")));
|
||||
}
|
||||
|
||||
@Test public void shouldRemoveHeadersAndPreserveBodyAndRemainingHeaders() {
|
||||
assertEquals(
|
||||
new Message("test1" , createMap("A", "1")),
|
||||
new Message("test1" , createMap("A", "1", "B", "2")).removeHeader("B"));
|
||||
}
|
||||
|
||||
private static Set<String> createSet(String... entries) {
|
||||
HashSet<String> set = new HashSet<String>();
|
||||
set.addAll(Arrays.asList(entries));
|
||||
return set;
|
||||
}
|
||||
|
||||
private static Map<String, Object> createMap(Object... pairs) {
|
||||
HashMap<String, Object> map = new HashMap<String, Object>();
|
||||
for (int i = 0; i < pairs.length; i += 2) {
|
||||
map.put((String)pairs[i], pairs[i+1]);
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
private static class TestTransformer implements Function<String, String> {
|
||||
public String apply(String param) {
|
||||
return param + "b";
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -21,8 +21,8 @@ public class SampleRemoteUntypedConsumer extends RemoteUntypedConsumerActor {
|
|||
|
||||
public void onReceive(Object message) {
|
||||
Message msg = (Message)message;
|
||||
String body = msg.bodyAs(String.class);
|
||||
String header = msg.headerAs("test", String.class);
|
||||
String body = msg.getBodyAs(String.class);
|
||||
String header = msg.getHeaderAs("test", String.class);
|
||||
getContext().replySafe(String.format("%s %s", body, header));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -13,8 +13,8 @@ public class SampleUntypedConsumer extends UntypedConsumerActor {
|
|||
|
||||
public void onReceive(Object message) {
|
||||
Message msg = (Message)message;
|
||||
String body = msg.bodyAs(String.class);
|
||||
String header = msg.headerAs("test", String.class);
|
||||
String body = msg.getBodyAs(String.class);
|
||||
String header = msg.getHeaderAs("test", String.class);
|
||||
getContext().replySafe(String.format("%s %s", body, header));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -15,8 +15,8 @@ public class SampleUntypedConsumerBlocking extends UntypedConsumerActor {
|
|||
|
||||
public void onReceive(Object message) {
|
||||
Message msg = (Message)message;
|
||||
String body = msg.bodyAs(String.class);
|
||||
String header = msg.headerAs("test", String.class);
|
||||
String body = msg.getBodyAs(String.class);
|
||||
String header = msg.getHeaderAs("test", String.class);
|
||||
getContext().replySafe(String.format("%s %s", body, header));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ public class SampleUntypedForwardingProducer extends UntypedProducerActor {
|
|||
@Override
|
||||
public void onReceiveAfterProduce(Object message) {
|
||||
Message msg = (Message)message;
|
||||
String body = msg.bodyAs(String.class);
|
||||
String body = msg.getBodyAs(String.class);
|
||||
CamelContextManager.getMandatoryTemplate().sendBody("direct:forward-test-1", body);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
5
akka-camel/src/test/scala/MessageJavaTest.scala
Normal file
5
akka-camel/src/test/scala/MessageJavaTest.scala
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
package se.scalablesolutions.akka.camel
|
||||
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
|
||||
class MessageJavaTest extends MessageJavaTestBase with JUnitSuite
|
||||
|
|
@ -10,17 +10,16 @@ import org.scalatest.BeforeAndAfterAll
|
|||
import org.scalatest.junit.JUnitSuite
|
||||
|
||||
|
||||
class MessageTest extends JUnitSuite with BeforeAndAfterAll {
|
||||
class MessageScalaTest extends JUnitSuite with BeforeAndAfterAll {
|
||||
override protected def beforeAll = CamelContextManager.init
|
||||
|
||||
@Test def shouldConvertDoubleBodyToString = {
|
||||
assertEquals("1.4", Message(1.4, null).bodyAs[String])
|
||||
assertEquals("1.4", Message(1.4, null).bodyAs(classOf[String]))
|
||||
assertEquals("1.4", Message(1.4).bodyAs[String])
|
||||
}
|
||||
|
||||
@Test def shouldThrowExceptionWhenConvertingDoubleBodyToInputStream {
|
||||
intercept[NoTypeConversionAvailableException] {
|
||||
Message(1.4, null).bodyAs[InputStream]
|
||||
Message(1.4).bodyAs[InputStream]
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -32,7 +31,6 @@ class MessageTest extends JUnitSuite with BeforeAndAfterAll {
|
|||
@Test def shouldConvertDoubleHeaderToString = {
|
||||
val message = Message("test" , Map("test" -> 1.4))
|
||||
assertEquals("1.4", message.headerAs[String]("test"))
|
||||
assertEquals("1.4", message.headerAs("test", classOf[String]))
|
||||
}
|
||||
|
||||
@Test def shouldReturnSubsetOfHeaders = {
|
||||
|
|
@ -43,7 +41,7 @@ class MessageTest extends JUnitSuite with BeforeAndAfterAll {
|
|||
@Test def shouldTransformBodyAndPreserveHeaders = {
|
||||
assertEquals(
|
||||
Message("ab", Map("A" -> "1")),
|
||||
Message("a" , Map("A" -> "1")).transformBody[String](body => body + "b"))
|
||||
Message("a" , Map("A" -> "1")).transformBody((body: String) => body + "b"))
|
||||
}
|
||||
|
||||
@Test def shouldConvertBodyAndPreserveHeaders = {
|
||||
|
|
@ -246,7 +246,7 @@ object ProducerFeatureTest {
|
|||
class TestProducer(uri: String, upper: Boolean = false) extends Actor with Producer {
|
||||
def endpointUri = uri
|
||||
override protected def receiveBeforeProduce = {
|
||||
case msg: Message => if (upper) msg.transformBody[String] { _.toUpperCase } else msg
|
||||
case msg: Message => if (upper) msg.transformBody { body: String => body.toUpperCase } else msg
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -261,7 +261,7 @@ object ProducerFeatureTest {
|
|||
protected def receive = {
|
||||
case msg: Message => msg.body match {
|
||||
case "fail" => self.reply(Failure(new Exception("failure"), msg.headers))
|
||||
case _ => self.reply(msg.transformBody[String] { "received %s" format _ })
|
||||
case _ => self.reply(msg.transformBody { body: String => "received %s" format body })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -84,7 +84,7 @@ class Consumer5 extends Actor with Consumer with Logging {
|
|||
|
||||
class Transformer(producer: ActorRef) extends Actor {
|
||||
protected def receive = {
|
||||
case msg: Message => producer.forward(msg.transformBody[String]("- %s -" format _))
|
||||
case msg: Message => producer.forward(msg.transformBody( (body: String) => "- %s -" format body))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -137,7 +137,7 @@ class HttpProducer(transformer: ActorRef) extends Actor with Producer {
|
|||
|
||||
class HttpTransformer extends Actor {
|
||||
protected def receive = {
|
||||
case msg: Message => self.reply(msg.transformBody[String] {_ replaceAll ("Akka ", "AKKA ")})
|
||||
case msg: Message => self.reply(msg.transformBody {body: String => body replaceAll ("Akka ", "AKKA ")})
|
||||
case msg: Failure => self.reply(msg)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue