Merge branch 'master' of github.com:jboner/akka into ticket-443

This commit is contained in:
ticktock 2010-10-03 19:03:14 -04:00
commit 09c2fb87a1
6 changed files with 246 additions and 225 deletions

View file

@ -6,11 +6,11 @@ package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.dispatch._
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
import se.scalablesolutions.akka.config.{ AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy }
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.stm.TransactionManagement._
import se.scalablesolutions.akka.stm.{TransactionManagement, TransactionSetAbortedException}
import se.scalablesolutions.akka.stm.{ TransactionManagement, TransactionSetAbortedException }
import se.scalablesolutions.akka.AkkaException
import se.scalablesolutions.akka.util._
import ReflectiveAccess._
@ -22,16 +22,15 @@ import org.multiverse.api.exceptions.DeadTransactionException
import java.net.InetSocketAddress
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ScheduledFuture, ConcurrentHashMap, TimeUnit}
import java.util.{Map => JMap}
import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit }
import java.util.{ Map => JMap }
import java.lang.reflect.Field
import scala.reflect.BeanProperty
object ActorRefStatus {
/** LifeCycles for ActorRefs
*/
/** LifeCycles for ActorRefs
*/
private[akka] sealed trait StatusType
object UNSTARTED extends StatusType
object RUNNING extends StatusType
@ -71,17 +70,17 @@ object ActorRefStatus {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait ActorRef extends
ActorRefShared with
TransactionManagement with
Logging with
java.lang.Comparable[ActorRef] { scalaRef: ScalaActorRef =>
trait ActorRef extends ActorRefShared with TransactionManagement with Logging with java.lang.Comparable[ActorRef] { scalaRef: ScalaActorRef =>
// Only mutable for RemoteServer in order to maintain identity across nodes
@volatile protected[akka] var _uuid = newUuid
@volatile protected[this] var _status: ActorRefStatus.StatusType = ActorRefStatus.UNSTARTED
@volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServerModule.HOSTNAME, RemoteServerModule.PORT)
@volatile protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None
@volatile
protected[akka] var _uuid = newUuid
@volatile
protected[this] var _status: ActorRefStatus.StatusType = ActorRefStatus.UNSTARTED
@volatile
protected[akka] var _homeAddress = new InetSocketAddress(RemoteServerModule.HOSTNAME, RemoteServerModule.PORT)
@volatile
protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None
protected[akka] val guard = new ReentrantGuard
/**
@ -94,7 +93,9 @@ trait ActorRef extends
* that you can use a custom name to be able to retrieve the "correct" persisted state
* upon restart, remote restart etc.
*/
@BeanProperty @volatile var id: String = _uuid.toString
@BeanProperty
@volatile
var id: String = _uuid.toString
/**
* User overridable callback/setting.
@ -102,7 +103,9 @@ trait ActorRef extends
* Defines the default timeout for '!!' and '!!!' invocations,
* e.g. the timeout for the future returned by the call to '!!' and '!!!'.
*/
@BeanProperty @volatile var timeout: Long = Actor.TIMEOUT
@BeanProperty
@volatile
var timeout: Long = Actor.TIMEOUT
/**
* User overridable callback/setting.
@ -110,7 +113,8 @@ trait ActorRef extends
* Defines the default timeout for an initial receive invocation.
* When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
*/
@volatile var receiveTimeout: Option[Long] = None
@volatile
var receiveTimeout: Option[Long] = None
/**
* Akka Java API
@ -162,8 +166,8 @@ trait ActorRef extends
def setLifeCycle(lifeCycle: LifeCycle) = this.lifeCycle = Some(lifeCycle)
def getLifeCycle(): Option[LifeCycle] = lifeCycle
@volatile private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher
@volatile
private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher
/**
* Akka Java API
@ -180,11 +184,11 @@ trait ActorRef extends
def setDispatcher(dispatcher: MessageDispatcher) = this.dispatcher = dispatcher
def getDispatcher(): MessageDispatcher = dispatcher
/**
* Holds the hot swapped partial function.
*/
@volatile protected[akka] var hotswap: Option[PartialFunction[Any, Unit]] = None // FIXME: _hotswap should be a stack
@volatile
protected[akka] var hotswap: Option[PartialFunction[Any, Unit]] = None // FIXME: _hotswap should be a stack
/**
* User overridable callback/setting.
@ -192,22 +196,26 @@ trait ActorRef extends
* Set to true if messages should have REQUIRES_NEW semantics, e.g. a new transaction should
* start if there is no one running, else it joins the existing transaction.
*/
@volatile protected[akka] var isTransactor = false
@volatile
protected[akka] var isTransactor = false
/**
* Configuration for TransactionFactory. User overridable.
*/
@volatile protected[akka] var _transactionConfig: TransactionConfig = DefaultGlobalTransactionConfig
@volatile
protected[akka] var _transactionConfig: TransactionConfig = DefaultGlobalTransactionConfig
/**
* TransactionFactory to be used for atomic when isTransactor. Configuration is overridable.
*/
@volatile private[akka] var _transactionFactory: Option[TransactionFactory] = None
@volatile
private[akka] var _transactionFactory: Option[TransactionFactory] = None
/**
* This is a reference to the message currently being processed by the actor
*/
@volatile protected[akka] var currentMessage: MessageInvocation = null
@volatile
protected[akka] var currentMessage: MessageInvocation = null
/**
* Comparison only takes uuid into account.
@ -276,7 +284,7 @@ trait ActorRef extends
* </pre>
* <p/>
*/
def sendOneWay(message: AnyRef): Unit = sendOneWay(message,null)
def sendOneWay(message: AnyRef): Unit = sendOneWay(message, null)
/**
* Akka Java API
@ -296,14 +304,14 @@ trait ActorRef extends
* @see sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef)
* Uses the defualt timeout of the Actor (setTimeout()) and omits the sender reference
*/
def sendRequestReply(message: AnyRef): AnyRef = sendRequestReply(message,timeout,null)
def sendRequestReply(message: AnyRef): AnyRef = sendRequestReply(message, timeout, null)
/**
* Akka Java API
* @see sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef)
* Uses the defualt timeout of the Actor (setTimeout())
*/
def sendRequestReply(message: AnyRef, sender: ActorRef): AnyRef = sendRequestReply(message,timeout,sender)
def sendRequestReply(message: AnyRef, sender: ActorRef): AnyRef = sendRequestReply(message, timeout, sender)
/**
* Akka Java API
@ -320,13 +328,13 @@ trait ActorRef extends
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef): AnyRef = {
!!(message,timeout)(Option(sender)).getOrElse(throw new ActorTimeoutException(
!!(message, timeout)(Option(sender)).getOrElse(throw new ActorTimeoutException(
"Message [" + message +
"]\n\tsent to [" + actorClassName +
"]\n\tfrom [" + (if(sender ne null) sender.actorClassName else "nowhere") +
"]\n\tfrom [" + (if (sender ne null) sender.actorClassName else "nowhere") +
"]\n\twith timeout [" + timeout +
"]\n\ttimed out."))
.asInstanceOf[AnyRef]
.asInstanceOf[AnyRef]
}
/**
@ -334,14 +342,14 @@ trait ActorRef extends
* @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_]
* Uses the Actors default timeout (setTimeout()) and omits the sender
*/
def sendRequestReplyFuture(message: AnyRef): Future[_] = sendRequestReplyFuture(message,timeout,null)
def sendRequestReplyFuture(message: AnyRef): Future[_] = sendRequestReplyFuture(message, timeout, null)
/**
* Akka Java API
* @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_]
* Uses the Actors default timeout (setTimeout())
*/
def sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] = sendRequestReplyFuture(message,timeout,sender)
def sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] = sendRequestReplyFuture(message, timeout, sender)
/**
* Akka Java API
@ -354,16 +362,15 @@ trait ActorRef extends
* If you are sending messages using <code>sendRequestReplyFuture</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def sendRequestReplyFuture(message: AnyRef, timeout: Long, sender: ActorRef): Future[_] = !!!(message,timeout)(Option(sender))
def sendRequestReplyFuture(message: AnyRef, timeout: Long, sender: ActorRef): Future[_] = !!!(message, timeout)(Option(sender))
/**
* Akka Java API
* Forwards the message specified to this actor and preserves the original sender of the message
*/
def forward(message: AnyRef, sender: ActorRef): Unit =
if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null")
else forward(message)(Some(sender))
if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null")
else forward(message)(Some(sender))
/**
* Akka Java API
@ -394,7 +401,6 @@ trait ActorRef extends
*/
def getActorClass(): Class[_ <: Actor] = actorClass
/**
* Returns the class name for the Actor instance that is managed by the ActorRef.
*/
@ -443,7 +449,6 @@ trait ActorRef extends
*/
def setTransactionConfig(config: TransactionConfig): Unit = transactionConfig = config
/**
* Get the transaction configuration for this actor.
*/
@ -455,7 +460,6 @@ trait ActorRef extends
*/
def getTransactionConfig(): TransactionConfig = transactionConfig
/**
* Returns the home address and port for this actor.
*/
@ -477,8 +481,7 @@ trait ActorRef extends
* Akka Java API
* Set the home address and port for this actor.
*/
def setHomeAddress(hostname: String, port: Int): Unit = homeAddress = (hostname,port)
def setHomeAddress(hostname: String, port: Int): Unit = homeAddress = (hostname, port)
/**
* Set the home address and port for this actor.
@ -491,7 +494,6 @@ trait ActorRef extends
*/
def setHomeAddress(address: InetSocketAddress): Unit = homeAddress = address
/**
* Returns the remote address for the actor, if any, else None.
*/
@ -504,7 +506,6 @@ trait ActorRef extends
*/
def getRemoteAddress(): Option[InetSocketAddress] = remoteAddress
/**
* Starts up the actor and its message queue.
*/
@ -567,7 +568,7 @@ trait ActorRef extends
*/
def spawnLink(clazz: Class[_ <: Actor]): ActorRef
/**
/**
* Atomically create (from actor class), make it remote, link and start an actor.
* <p/>
* To be invoked from within the actor itself.
@ -601,10 +602,10 @@ trait ActorRef extends
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T]
message: Any,
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T]
protected[akka] def actorInstance: AtomicReference[Actor]
@ -631,7 +632,7 @@ trait ActorRef extends
override def equals(that: Any): Boolean = {
that.isInstanceOf[ActorRef] &&
that.asInstanceOf[ActorRef].uuid == uuid
that.asInstanceOf[ActorRef].uuid == uuid
}
override def toString = "Actor[" + id + ":" + uuid + "]"
@ -645,7 +646,7 @@ trait ActorRef extends
}
protected[akka] def cancelReceiveTimeout = {
if(_futureTimeout.isDefined) {
if (_futureTimeout.isDefined) {
_futureTimeout.get.cancel(true)
_futureTimeout = None
log.debug("Timeout canceled for %s", this)
@ -658,17 +659,24 @@ trait ActorRef extends
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class LocalActorRef private[akka](
class LocalActorRef private[akka] (
private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None))
extends ActorRef with ScalaActorRef {
@volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None // only mutable to maintain identity across nodes
@volatile private[akka] var _linkedActors: Option[ConcurrentHashMap[Uuid, ActorRef]] = None
@volatile private[akka] var _supervisor: Option[ActorRef] = None
@volatile private var isInInitialization = false
@volatile private var maxNrOfRetriesCount: Int = 0
@volatile private var restartsWithinTimeRangeTimestamp: Long = 0L
@volatile private var _mailbox: AnyRef = _
@volatile
private[akka] var _remoteAddress: Option[InetSocketAddress] = None // only mutable to maintain identity across nodes
@volatile
private[akka] var _linkedActors: Option[ConcurrentHashMap[Uuid, ActorRef]] = None
@volatile
private[akka] var _supervisor: Option[ActorRef] = None
@volatile
private var isInInitialization = false
@volatile
private var maxNrOfRetriesCount: Int = 0
@volatile
private var restartsWithinTimeRangeTimestamp: Long = 0L
@volatile
private var _mailbox: AnyRef = _
protected[akka] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) }
@ -680,36 +688,36 @@ class LocalActorRef private[akka](
if (isRunning) initializeActorInstance
private[akka] def this(clazz: Class[_ <: Actor]) = this(Left(Some(clazz)))
private[akka] def this(factory: () => Actor) = this(Right(Some(factory)))
private[akka] def this(factory: () => Actor) = this(Right(Some(factory)))
// used only for deserialization
private[akka] def this(__uuid: Uuid,
__id: String,
__hostname: String,
__port: Int,
__isTransactor: Boolean,
__timeout: Long,
__receiveTimeout: Option[Long],
__lifeCycle: Option[LifeCycle],
__supervisor: Option[ActorRef],
__hotswap: Option[PartialFunction[Any, Unit]],
__factory: () => Actor) = {
this(__factory)
_uuid = __uuid
id = __id
homeAddress = (__hostname, __port)
isTransactor = __isTransactor
timeout = __timeout
receiveTimeout = __receiveTimeout
lifeCycle = __lifeCycle
_supervisor = __supervisor
hotswap = __hotswap
actorSelfFields._1.set(actor, this)
actorSelfFields._2.set(actor, Some(this))
start
checkReceiveTimeout
ActorRegistry.register(this)
}
__id: String,
__hostname: String,
__port: Int,
__isTransactor: Boolean,
__timeout: Long,
__receiveTimeout: Option[Long],
__lifeCycle: Option[LifeCycle],
__supervisor: Option[ActorRef],
__hotswap: Option[PartialFunction[Any, Unit]],
__factory: () => Actor) = {
this(__factory)
_uuid = __uuid
id = __id
homeAddress = (__hostname, __port)
isTransactor = __isTransactor
timeout = __timeout
receiveTimeout = __receiveTimeout
lifeCycle = __lifeCycle
_supervisor = __supervisor
hotswap = __hotswap
actorSelfFields._1.set(actor, this)
actorSelfFields._2.set(actor, Some(this))
start
checkReceiveTimeout
ActorRegistry.register(this)
}
// ========= PUBLIC FUNCTIONS =========
@ -730,7 +738,7 @@ class LocalActorRef private[akka](
if (!isBeingRestarted) {
if (!isRunning) _dispatcher = md
else throw new ActorInitializationException(
"Can not swap dispatcher for " + toString + " after it has been started")
"Can not swap dispatcher for " + toString + " after it has been started")
}
}
@ -828,8 +836,8 @@ class LocalActorRef private[akka](
actor.postStop
ActorRegistry.unregister(this)
if (isRemotingEnabled) {
if(remoteAddress.isDefined)
RemoteClientModule.unregister(remoteAddress.get, uuid)
if (remoteAddress.isDefined)
RemoteClientModule.unregister(remoteAddress.get, uuid)
RemoteServerModule.unregister(this)
}
nullOutActorRefReferencesFor(actorInstance.get)
@ -872,7 +880,7 @@ class LocalActorRef private[akka](
* <p/>
* To be invoked from within the actor itself.
*/
def startLink(actorRef: ActorRef):Unit = guard.withGuard {
def startLink(actorRef: ActorRef): Unit = guard.withGuard {
try {
link(actorRef)
} finally {
@ -954,7 +962,7 @@ class LocalActorRef private[akka](
*/
def mailbox: AnyRef = _mailbox
protected[akka] def mailbox_=(value: AnyRef):AnyRef = { _mailbox = value; value }
protected[akka] def mailbox_=(value: AnyRef): AnyRef = { _mailbox = value; value }
/**
* Shuts down and removes all linked actors.
@ -986,10 +994,10 @@ class LocalActorRef private[akka](
}
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
message: Any,
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
joinTransaction(message)
if (remoteAddress.isDefined && isRemotingEnabled) {
@ -999,7 +1007,7 @@ class LocalActorRef private[akka](
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
} else {
val future = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture[T](timeout)
else new DefaultCompletableFuture[T](timeout)
val invocation = new MessageInvocation(
this, message, senderOption, Some(future.asInstanceOf[CompletableFuture[Any]]), transactionSet.get)
dispatcher dispatch invocation
@ -1021,7 +1029,8 @@ class LocalActorRef private[akka](
case e =>
Actor.log.error(e, "Could not invoke actor [%s]", this)
throw e
} finally {
}
finally {
currentMessage = null //TODO: Don't reset this, we might want to resend the message
}
}
@ -1047,15 +1056,15 @@ class LocalActorRef private[akka](
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = {
if (maxNrOfRetriesCount == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis // first time around
val tooManyRestarts = if (maxNrOfRetries.isDefined) {
maxNrOfRetriesCount += 1
maxNrOfRetriesCount > maxNrOfRetries.get
} else false
maxNrOfRetriesCount += 1
maxNrOfRetriesCount > maxNrOfRetries.get
} else false
val restartingHasExpired = if (withinTimeRange.isDefined)
(System.currentTimeMillis - restartsWithinTimeRangeTimestamp) > withinTimeRange.get
else false
(System.currentTimeMillis - restartsWithinTimeRangeTimestamp) > withinTimeRange.get
else false
if (tooManyRestarts || restartingHasExpired) {
val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)
@ -1088,8 +1097,8 @@ class LocalActorRef private[akka](
Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id)
if (isProxyableDispatcher(failedActor)) restartProxyableDispatcher(failedActor, reason)
else restartActor(failedActor, reason)
else restartActor(failedActor, reason)
_status = ActorRefStatus.RUNNING
}
}
@ -1149,27 +1158,27 @@ class LocalActorRef private[akka](
private def spawnButDoNotStart(clazz: Class[_ <: Actor]): ActorRef = Actor.actorOf(clazz.newInstance)
private[this] def newActor: Actor = {
Actor.actorRefInCreation.withValue(Some(this)){
isInInitialization = true
val actor = actorFactory match {
case Left(Some(clazz)) =>
import ReflectiveAccess.{createInstance,noParams,noArgs}
createInstance(clazz.asInstanceOf[Class[_]], noParams, noArgs).
getOrElse(throw new ActorInitializationException(
"Could not instantiate Actor" +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'."))
case Right(Some(factory)) =>
factory()
case _ =>
throw new ActorInitializationException(
"Can't create Actor, no Actor class or factory function in scope")
}
if (actor eq null) throw new ActorInitializationException(
"Actor instance passed to ActorRef can not be 'null'")
isInInitialization = false
actor
Actor.actorRefInCreation.withValue(Some(this)) {
isInInitialization = true
val actor = actorFactory match {
case Left(Some(clazz)) =>
import ReflectiveAccess.{ createInstance, noParams, noArgs }
createInstance(clazz.asInstanceOf[Class[_]], noParams, noArgs).
getOrElse(throw new ActorInitializationException(
"Could not instantiate Actor" +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'."))
case Right(Some(factory)) =>
factory()
case _ =>
throw new ActorInitializationException(
"Can't create Actor, no Actor class or factory function in scope")
}
if (actor eq null) throw new ActorInitializationException(
"Actor instance passed to ActorRef can not be 'null'")
isInInitialization = false
actor
}
}
@ -1181,15 +1190,15 @@ class LocalActorRef private[akka](
createNewTransactionSet
} else oldTxSet
Actor.log.trace("Joining transaction set [" + currentTxSet +
"];\n\tactor " + toString +
"\n\twith message [" + message + "]")
"];\n\tactor " + toString +
"\n\twith message [" + message + "]")
val mtx = ThreadLocalTransaction.getThreadLocalTransaction
if ((mtx eq null) || mtx.getStatus.isDead) currentTxSet.incParties
else currentTxSet.incParties(mtx, 1)
}
private def dispatch[T](messageHandle: MessageInvocation) = {
Actor.log.trace("Invoking actor with message: %s\n",messageHandle)
Actor.log.trace("Invoking actor with message: %s\n", messageHandle)
val message = messageHandle.message //serializeMessage(messageHandle.message)
var topLevelTransaction = false
val txSet: Option[CountDownCommitBarrier] =
@ -1198,7 +1207,7 @@ class LocalActorRef private[akka](
topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx
if (isTransactor) {
Actor.log.trace("Creating a new transaction set (top-level transaction)\n\tfor actor " + toString +
"\n\twith message " + messageHandle)
"\n\twith message " + messageHandle)
Some(createNewTransactionSet)
} else None
}
@ -1223,7 +1232,8 @@ class LocalActorRef private[akka](
message, topLevelTransaction)
case e: InterruptedException => {} // received message while actor is shutting down, ignore
case e => handleExceptionInDispatch(e, message, topLevelTransaction)
} finally {
}
finally {
clearTransaction
if (topLevelTransaction) clearTransactionSet
}
@ -1239,7 +1249,7 @@ class LocalActorRef private[akka](
"All linked actors have died permanently (they were all configured as TEMPORARY)" +
"\n\tshutting down and unlinking supervisor actor as well [%s].",
temporaryActor.id)
notifySupervisorWithMessage(UnlinkAndStop(this))
notifySupervisorWithMessage(UnlinkAndStop(this))
}
}
@ -1274,8 +1284,8 @@ class LocalActorRef private[akka](
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
_supervisor.foreach { sup =>
if (sup.isShutdown) { // if supervisor is shut down, game over for all linked actors
shutdownLinkedActors
stop
shutdownLinkedActors
stop
} else sup ! notification // else notify supervisor
}
}
@ -1287,8 +1297,8 @@ class LocalActorRef private[akka](
private def findActorSelfField(clazz: Class[_]): Tuple2[Field, Field] = {
try {
val selfField = clazz.getDeclaredField("self")
val someSelfField = clazz.getDeclaredField("someSelf")
val selfField = clazz.getDeclaredField("self")
val someSelfField = clazz.getDeclaredField("someSelf")
selfField.setAccessible(true)
someSelfField.setAccessible(true)
(selfField, someSelfField)
@ -1310,7 +1320,7 @@ class LocalActorRef private[akka](
checkReceiveTimeout
}
/*
/*
private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) {
if (!message.isInstanceOf[String] &&
!message.isInstanceOf[Byte] &&
@ -1359,7 +1369,7 @@ private[akka] case class RemoteActorRef private[akka] (
val port: Int,
_timeout: Long,
loader: Option[ClassLoader],
val actorType: ActorType = ActorType.ScalaActor)
val actorType: ActorType = ActorType.ScalaActor)
extends ActorRef with ScalaActorRef {
ensureRemotingEnabled
@ -1375,10 +1385,10 @@ private[akka] case class RemoteActorRef private[akka] (
message, senderOption, None, remoteAddress.get, timeout, true, this, None, actorType)
def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
message: Any,
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
val future = RemoteClientModule.send[T](
message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, actorType)
if (future.isDefined) future.get
@ -1425,7 +1435,7 @@ private[akka] case class RemoteActorRef private[akka] (
def supervisor: Option[ActorRef] = unsupported
def shutdownLinkedActors: Unit = unsupported
protected[akka] def mailbox: AnyRef = unsupported
protected[akka] def mailbox_=(value: AnyRef):AnyRef = unsupported
protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported
@ -1467,26 +1477,27 @@ trait ActorRefShared {
*/
trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
/**
* Identifier for actor, does not have to be a unique one. Default is the 'uuid'.
* <p/>
* This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote
* actor in RemoteServer etc.But also as the identifier for persistence, which means
* that you can use a custom name to be able to retrieve the "correct" persisted state
* upon restart, remote restart etc.
*/
def id: String
/**
* Identifier for actor, does not have to be a unique one. Default is the 'uuid'.
* <p/>
* This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote
* actor in RemoteServer etc.But also as the identifier for persistence, which means
* that you can use a custom name to be able to retrieve the "correct" persisted state
* upon restart, remote restart etc.
*/
def id: String
def id_=(id: String): Unit
def id_=(id: String): Unit
/**
/**
* User overridable callback/setting.
* <p/>
* Defines the life-cycle for a supervised actor.
*/
@volatile var lifeCycle: Option[LifeCycle] = None
@volatile
var lifeCycle: Option[LifeCycle] = None
/**
/**
* User overridable callback/setting.
*
* <p/>
@ -1510,8 +1521,8 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
* trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError])
* </pre>
*/
@volatile var trapExit: List[Class[_ <: Throwable]] = Nil
@volatile
var trapExit: List[Class[_ <: Throwable]] = Nil
/**
* User overridable callback/setting.
@ -1527,8 +1538,8 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
* faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
* </pre>
*/
@volatile var faultHandler: Option[FaultHandlingStrategy] = None
@volatile
var faultHandler: Option[FaultHandlingStrategy] = None
/**
* The reference sender Actor of the last received message.
@ -1550,7 +1561,6 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
else msg.senderFuture
}
/**
* Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
* <p/>
@ -1587,7 +1597,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
if (isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout[Any](message, timeout, sender, None)
val isMessageJoinPoint = if (isTypedActorEnabled) TypedActorModule.resolveFutureIfMessageIsJoinPoint(message, future)
else false
else false
try {
future.await
} catch {
@ -1598,10 +1608,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
if (future.exception.isDefined) throw future.exception.get
else future.result
} else throw new ActorInitializationException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
/**
* Sends a message asynchronously returns a future holding the eventual reply message.
* <p/>
@ -1637,7 +1646,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
* <p/>
* Throws an IllegalStateException if unable to determine what to reply to.
*/
def reply(message: Any) = if(!reply_?(message)) throw new IllegalActorStateException(
def reply(message: Any) = if (!reply_?(message)) throw new IllegalActorStateException(
"\n\tNo sender in scope, can't reply. " +
"\n\tYou have probably: " +
"\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." +
@ -1677,11 +1686,10 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
} else throw new IllegalActorStateException("No channel available")
}
/**
* Atomically create (from actor class) and start an actor.
*/
def spawn[T <: Actor : Manifest]: ActorRef =
def spawn[T <: Actor: Manifest]: ActorRef =
spawn(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
/**
@ -1689,10 +1697,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
*/
def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = {
ensureRemotingEnabled
spawnRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]],hostname,port)
spawnRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port)
}
/**
* Atomically create (from actor class), start and link an actor.
*/
@ -1702,16 +1709,15 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
/**
* Atomically create (from actor class), start, link and make an actor remote.
*/
def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = {
def spawnLinkRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = {
ensureRemotingEnabled
spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]],hostname,port)
spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port)
}
}
/**
* Abstraction for unification of sender and senderFuture for later reply
*/
abstract class Channel[T] {
def !(msg: T): Unit
}
}

View file

@ -32,15 +32,36 @@ object Config {
System.setProperty("org.multiverse.api.GlobalStmInstance.factorymethod", "org.multiverse.stms.alpha.AlphaStm.createFast")
val HOME = {
val systemHome = System.getenv("AKKA_HOME")
if ((systemHome eq null) || systemHome.length == 0 || systemHome == ".") {
val optionHome = System.getProperty("akka.home", "")
if (optionHome.length != 0) Some(optionHome)
else None
} else Some(systemHome)
val envHome = System.getenv("AKKA_HOME") match {
case null | "" | "." => None
case value => Some(value)
}
val systemHome = System.getProperty("akka.home") match {
case null | "" => None
case value => Some(value)
}
envHome orElse systemHome
}
val config = {
val confName = {
val envConf = System.getenv("AKKA_MODE") match {
case null | "" => None
case value => Some(value)
}
val systemConf = System.getProperty("akka.mode") match {
case null | "" => None
case value => Some(value)
}
(envConf orElse systemConf).map("akka." + _ + ".conf").getOrElse("akka.conf")
}
if (System.getProperty("akka.config", "") != "") {
val configFile = System.getProperty("akka.config", "")
try {
@ -52,19 +73,9 @@ object Config {
"\n\tdue to: " + e.toString)
}
Configgy.config
} else if (getClass.getClassLoader.getResource("akka.conf") ne null) {
} else if (HOME.isDefined) {
try {
Configgy.configureFromResource("akka.conf", getClass.getClassLoader)
ConfigLogger.log.info("Config loaded from the application classpath.")
} catch {
case e: ParseException => throw new ConfigurationException(
"Can't load 'akka.conf' config file from application classpath," +
"\n\tdue to: " + e.toString)
}
Configgy.config
} else if (HOME.isDefined) {
try {
val configFile = HOME.getOrElse(throwNoAkkaHomeException) + "/config/akka.conf"
val configFile = HOME.getOrElse(throwNoAkkaHomeException) + "/config/" + confName
Configgy.configure(configFile)
ConfigLogger.log.info(
"AKKA_HOME is defined as [%s], config loaded from [%s].",
@ -73,18 +84,28 @@ object Config {
} catch {
case e: ParseException => throw new ConfigurationException(
"AKKA_HOME is defined as [" + HOME.get + "] " +
"\n\tbut the 'akka.conf' config file can not be found at [" + HOME.get + "/config/akka.conf]," +
"\n\tbut the 'akka.conf' config file can not be found at [" + HOME.get + "/config/"+ confName + "]," +
"\n\tdue to: " + e.toString)
}
Configgy.config
} else if (getClass.getClassLoader.getResource(confName) ne null) {
try {
Configgy.configureFromResource(confName, getClass.getClassLoader)
ConfigLogger.log.info("Config [%s] loaded from the application classpath.",confName)
} catch {
case e: ParseException => throw new ConfigurationException(
"Can't load '" + confName + "' config file from application classpath," +
"\n\tdue to: " + e.toString)
}
Configgy.config
} else {
ConfigLogger.log.warning(
"\nCan't load 'akka.conf'." +
"\nOne of the three ways of locating the 'akka.conf' file needs to be defined:" +
"\nCan't load '" + confName + "'." +
"\nOne of the three ways of locating the '" + confName + "' file needs to be defined:" +
"\n\t1. Define the '-Dakka.config=...' system property option." +
"\n\t2. Put the 'akka.conf' file on the classpath." +
"\n\t2. Put the '" + confName + "' file on the classpath." +
"\n\t3. Define 'AKKA_HOME' environment variable pointing to the root of the Akka distribution." +
"\nI have no way of finding the 'akka.conf' configuration file." +
"\nI have no way of finding the '" + confName + "' configuration file." +
"\nUsing default values everywhere.")
CConfig.fromString("<akka></akka>") // default empty config
}
@ -92,7 +113,7 @@ object Config {
val CONFIG_VERSION = config.getString("akka.version", VERSION)
if (VERSION != CONFIG_VERSION) throw new ConfigurationException(
"Akka JAR version [" + VERSION + "] is different than the provided config ('akka.conf') version [" + CONFIG_VERSION + "]")
"Akka JAR version [" + VERSION + "] is different than the provided config version [" + CONFIG_VERSION + "]")
val TIME_UNIT = config.getString("akka.time-unit", "seconds")

View file

@ -45,6 +45,11 @@ trait ListenerManagement extends Logging {
*/
def hasListeners: Boolean = !listeners.isEmpty
/**
* Checks if a specfic listener is registered.
*/
def hasListener(listener: ActorRef): Boolean = listeners.contains(listener)
protected def notifyListeners(message: => Any) {
if (hasListeners) {
val msg = message

View file

@ -258,13 +258,8 @@ object ReflectiveAccess extends Logging {
ctor.setAccessible(true)
Some(ctor.newInstance(args: _*).asInstanceOf[T])
} catch {
case e: java.lang.reflect.InvocationTargetException =>
e.printStackTrace
log.error(e.getCause, "Could not instantiate class [%s]", clazz.getName)
None
case e: Exception =>
e.printStackTrace
log.error(e.getCause, "Could not instantiate class [%s]", clazz.getName)
log.debug(e, "Could not instantiate class [%s] due to [%s]", clazz.getName, e.getMessage)
None
}
@ -280,13 +275,8 @@ object ReflectiveAccess extends Logging {
ctor.setAccessible(true)
Some(ctor.newInstance(args: _*).asInstanceOf[T])
} catch {
case e: java.lang.reflect.InvocationTargetException =>
e.printStackTrace
log.error(e.getCause, "Could not instantiate class [%s] due to [%s]", fqn, e.toString)
None
case e: Exception =>
e.printStackTrace
log.error(e.getCause, "Could not instantiate class [%s] due to [%s]", fqn, e.toString)
log.debug(e, "Could not instantiate class [%s] due to [%s]", fqn, e.getMessage)
None
}
@ -297,13 +287,8 @@ object ReflectiveAccess extends Logging {
instance.setAccessible(true)
Option(instance.get(null).asInstanceOf[T])
} catch {
case e: java.lang.reflect.InvocationTargetException =>
e.printStackTrace
log.error(e.getCause, "Could not instantiate class [%s]", fqn)
None
case e: Exception =>
e.printStackTrace
log.error(e.getCause, "Could not instantiate class [%s]", fqn)
log.debug(e, "Could not get object [%s] due to [%s]", fqn, e.getMessage)
None
}

View file

@ -5,23 +5,27 @@
package se.scalablesolutions.akka.comet
import org.atmosphere.cpr.{AtmosphereResourceEvent, AtmosphereResource}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.dispatch.Dispatchers
import org.atmosphere.jersey.util.JerseyBroadcasterUtil
object AkkaBroadcaster {
val broadcasterDispatcher = Dispatchers.fromConfig("akka.rest.comet-dispatcher")
type Event = AtmosphereResourceEvent[_,_]
type Resource = AtmosphereResource[_,_]
}
class AkkaBroadcaster extends org.atmosphere.jersey.JerseyBroadcaster {
class AkkaBroadcaster extends org.atmosphere.jersey.util.JerseySimpleBroadcaster {
import AkkaBroadcaster._
name = classOf[AkkaBroadcaster].getName
//FIXME should be supervised
val caster = actorOf(new Actor {
lazy val caster = actorOf(new Actor {
self.dispatcher = broadcasterDispatcher
def receive = {
case f : Function0[_] => f()
case (r: Resource,e: Event) => JerseyBroadcasterUtil.broadcast(r,e)
}
}).start
@ -30,7 +34,7 @@ class AkkaBroadcaster extends org.atmosphere.jersey.JerseyBroadcaster {
caster.stop
}
protected override def broadcast(r : AtmosphereResource[_,_], e : AtmosphereResourceEvent[_,_]) = {
caster ! (() => super.broadcast(r,e))
protected override def broadcast(r: Resource, e : Event) {
caster ! ((r,e))
}
}
}

View file

@ -91,7 +91,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
// Versions
// -------------------------------------------------------------------------------------------------------------------
lazy val ATMO_VERSION = "0.6.1"
lazy val ATMO_VERSION = "0.6.2"
lazy val CAMEL_VERSION = "2.4.0"
lazy val CASSANDRA_VERSION = "0.6.1"
lazy val DISPATCH_VERSION = "0.7.4"