diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index e6ee2a4c66..6bb64363f3 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -6,11 +6,11 @@ package se.scalablesolutions.akka.actor import se.scalablesolutions.akka.dispatch._ import se.scalablesolutions.akka.config.Config._ -import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy} +import se.scalablesolutions.akka.config.{ AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy } import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.stm.global._ import se.scalablesolutions.akka.stm.TransactionManagement._ -import se.scalablesolutions.akka.stm.{TransactionManagement, TransactionSetAbortedException} +import se.scalablesolutions.akka.stm.{ TransactionManagement, TransactionSetAbortedException } import se.scalablesolutions.akka.AkkaException import se.scalablesolutions.akka.util._ import ReflectiveAccess._ @@ -22,16 +22,15 @@ import org.multiverse.api.exceptions.DeadTransactionException import java.net.InetSocketAddress import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.{ScheduledFuture, ConcurrentHashMap, TimeUnit} -import java.util.{Map => JMap} +import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit } +import java.util.{ Map => JMap } import java.lang.reflect.Field import scala.reflect.BeanProperty - object ActorRefStatus { - /** LifeCycles for ActorRefs - */ + /** LifeCycles for ActorRefs + */ private[akka] sealed trait StatusType object UNSTARTED extends StatusType object RUNNING extends StatusType @@ -71,17 +70,17 @@ object ActorRefStatus { * * @author Jonas Bonér */ -trait ActorRef extends - ActorRefShared with - TransactionManagement with - Logging with - java.lang.Comparable[ActorRef] { scalaRef: ScalaActorRef => +trait ActorRef extends ActorRefShared with TransactionManagement with Logging with java.lang.Comparable[ActorRef] { scalaRef: ScalaActorRef => // Only mutable for RemoteServer in order to maintain identity across nodes - @volatile protected[akka] var _uuid = newUuid - @volatile protected[this] var _status: ActorRefStatus.StatusType = ActorRefStatus.UNSTARTED - @volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServerModule.HOSTNAME, RemoteServerModule.PORT) - @volatile protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None + @volatile + protected[akka] var _uuid = newUuid + @volatile + protected[this] var _status: ActorRefStatus.StatusType = ActorRefStatus.UNSTARTED + @volatile + protected[akka] var _homeAddress = new InetSocketAddress(RemoteServerModule.HOSTNAME, RemoteServerModule.PORT) + @volatile + protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None protected[akka] val guard = new ReentrantGuard /** @@ -94,7 +93,9 @@ trait ActorRef extends * that you can use a custom name to be able to retrieve the "correct" persisted state * upon restart, remote restart etc. */ - @BeanProperty @volatile var id: String = _uuid.toString + @BeanProperty + @volatile + var id: String = _uuid.toString /** * User overridable callback/setting. @@ -102,7 +103,9 @@ trait ActorRef extends * Defines the default timeout for '!!' and '!!!' invocations, * e.g. the timeout for the future returned by the call to '!!' and '!!!'. */ - @BeanProperty @volatile var timeout: Long = Actor.TIMEOUT + @BeanProperty + @volatile + var timeout: Long = Actor.TIMEOUT /** * User overridable callback/setting. @@ -110,7 +113,8 @@ trait ActorRef extends * Defines the default timeout for an initial receive invocation. * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. */ - @volatile var receiveTimeout: Option[Long] = None + @volatile + var receiveTimeout: Option[Long] = None /** * Akka Java API @@ -162,8 +166,8 @@ trait ActorRef extends def setLifeCycle(lifeCycle: LifeCycle) = this.lifeCycle = Some(lifeCycle) def getLifeCycle(): Option[LifeCycle] = lifeCycle - - @volatile private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher + @volatile + private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher /** * Akka Java API @@ -180,11 +184,11 @@ trait ActorRef extends def setDispatcher(dispatcher: MessageDispatcher) = this.dispatcher = dispatcher def getDispatcher(): MessageDispatcher = dispatcher - /** * Holds the hot swapped partial function. */ - @volatile protected[akka] var hotswap: Option[PartialFunction[Any, Unit]] = None // FIXME: _hotswap should be a stack + @volatile + protected[akka] var hotswap: Option[PartialFunction[Any, Unit]] = None // FIXME: _hotswap should be a stack /** * User overridable callback/setting. @@ -192,22 +196,26 @@ trait ActorRef extends * Set to true if messages should have REQUIRES_NEW semantics, e.g. a new transaction should * start if there is no one running, else it joins the existing transaction. */ - @volatile protected[akka] var isTransactor = false + @volatile + protected[akka] var isTransactor = false /** * Configuration for TransactionFactory. User overridable. */ - @volatile protected[akka] var _transactionConfig: TransactionConfig = DefaultGlobalTransactionConfig + @volatile + protected[akka] var _transactionConfig: TransactionConfig = DefaultGlobalTransactionConfig /** * TransactionFactory to be used for atomic when isTransactor. Configuration is overridable. */ - @volatile private[akka] var _transactionFactory: Option[TransactionFactory] = None + @volatile + private[akka] var _transactionFactory: Option[TransactionFactory] = None /** * This is a reference to the message currently being processed by the actor */ - @volatile protected[akka] var currentMessage: MessageInvocation = null + @volatile + protected[akka] var currentMessage: MessageInvocation = null /** * Comparison only takes uuid into account. @@ -276,7 +284,7 @@ trait ActorRef extends * *

*/ - def sendOneWay(message: AnyRef): Unit = sendOneWay(message,null) + def sendOneWay(message: AnyRef): Unit = sendOneWay(message, null) /** * Akka Java API @@ -296,14 +304,14 @@ trait ActorRef extends * @see sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef) * Uses the defualt timeout of the Actor (setTimeout()) and omits the sender reference */ - def sendRequestReply(message: AnyRef): AnyRef = sendRequestReply(message,timeout,null) + def sendRequestReply(message: AnyRef): AnyRef = sendRequestReply(message, timeout, null) /** * Akka Java API * @see sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef) * Uses the defualt timeout of the Actor (setTimeout()) */ - def sendRequestReply(message: AnyRef, sender: ActorRef): AnyRef = sendRequestReply(message,timeout,sender) + def sendRequestReply(message: AnyRef, sender: ActorRef): AnyRef = sendRequestReply(message, timeout, sender) /** * Akka Java API @@ -320,13 +328,13 @@ trait ActorRef extends * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef): AnyRef = { - !!(message,timeout)(Option(sender)).getOrElse(throw new ActorTimeoutException( + !!(message, timeout)(Option(sender)).getOrElse(throw new ActorTimeoutException( "Message [" + message + "]\n\tsent to [" + actorClassName + - "]\n\tfrom [" + (if(sender ne null) sender.actorClassName else "nowhere") + + "]\n\tfrom [" + (if (sender ne null) sender.actorClassName else "nowhere") + "]\n\twith timeout [" + timeout + "]\n\ttimed out.")) - .asInstanceOf[AnyRef] + .asInstanceOf[AnyRef] } /** @@ -334,14 +342,14 @@ trait ActorRef extends * @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] * Uses the Actors default timeout (setTimeout()) and omits the sender */ - def sendRequestReplyFuture(message: AnyRef): Future[_] = sendRequestReplyFuture(message,timeout,null) + def sendRequestReplyFuture(message: AnyRef): Future[_] = sendRequestReplyFuture(message, timeout, null) /** * Akka Java API * @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] * Uses the Actors default timeout (setTimeout()) */ - def sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] = sendRequestReplyFuture(message,timeout,sender) + def sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] = sendRequestReplyFuture(message, timeout, sender) /** * Akka Java API @@ -354,16 +362,15 @@ trait ActorRef extends * If you are sending messages using sendRequestReplyFuture then you have to use getContext().reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def sendRequestReplyFuture(message: AnyRef, timeout: Long, sender: ActorRef): Future[_] = !!!(message,timeout)(Option(sender)) + def sendRequestReplyFuture(message: AnyRef, timeout: Long, sender: ActorRef): Future[_] = !!!(message, timeout)(Option(sender)) /** * Akka Java API * Forwards the message specified to this actor and preserves the original sender of the message */ def forward(message: AnyRef, sender: ActorRef): Unit = - if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null") - else forward(message)(Some(sender)) - + if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null") + else forward(message)(Some(sender)) /** * Akka Java API @@ -394,7 +401,6 @@ trait ActorRef extends */ def getActorClass(): Class[_ <: Actor] = actorClass - /** * Returns the class name for the Actor instance that is managed by the ActorRef. */ @@ -443,7 +449,6 @@ trait ActorRef extends */ def setTransactionConfig(config: TransactionConfig): Unit = transactionConfig = config - /** * Get the transaction configuration for this actor. */ @@ -455,7 +460,6 @@ trait ActorRef extends */ def getTransactionConfig(): TransactionConfig = transactionConfig - /** * Returns the home address and port for this actor. */ @@ -477,8 +481,7 @@ trait ActorRef extends * Akka Java API * Set the home address and port for this actor. */ - def setHomeAddress(hostname: String, port: Int): Unit = homeAddress = (hostname,port) - + def setHomeAddress(hostname: String, port: Int): Unit = homeAddress = (hostname, port) /** * Set the home address and port for this actor. @@ -491,7 +494,6 @@ trait ActorRef extends */ def setHomeAddress(address: InetSocketAddress): Unit = homeAddress = address - /** * Returns the remote address for the actor, if any, else None. */ @@ -504,7 +506,6 @@ trait ActorRef extends */ def getRemoteAddress(): Option[InetSocketAddress] = remoteAddress - /** * Starts up the actor and its message queue. */ @@ -567,7 +568,7 @@ trait ActorRef extends */ def spawnLink(clazz: Class[_ <: Actor]): ActorRef - /** + /** * Atomically create (from actor class), make it remote, link and start an actor. *

* To be invoked from within the actor itself. @@ -601,10 +602,10 @@ trait ActorRef extends protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( - message: Any, - timeout: Long, - senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] + message: Any, + timeout: Long, + senderOption: Option[ActorRef], + senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] protected[akka] def actorInstance: AtomicReference[Actor] @@ -631,7 +632,7 @@ trait ActorRef extends override def equals(that: Any): Boolean = { that.isInstanceOf[ActorRef] && - that.asInstanceOf[ActorRef].uuid == uuid + that.asInstanceOf[ActorRef].uuid == uuid } override def toString = "Actor[" + id + ":" + uuid + "]" @@ -645,7 +646,7 @@ trait ActorRef extends } protected[akka] def cancelReceiveTimeout = { - if(_futureTimeout.isDefined) { + if (_futureTimeout.isDefined) { _futureTimeout.get.cancel(true) _futureTimeout = None log.debug("Timeout canceled for %s", this) @@ -658,17 +659,24 @@ trait ActorRef extends * * @author Jonas Bonér */ -class LocalActorRef private[akka]( +class LocalActorRef private[akka] ( private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None)) extends ActorRef with ScalaActorRef { - @volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None // only mutable to maintain identity across nodes - @volatile private[akka] var _linkedActors: Option[ConcurrentHashMap[Uuid, ActorRef]] = None - @volatile private[akka] var _supervisor: Option[ActorRef] = None - @volatile private var isInInitialization = false - @volatile private var maxNrOfRetriesCount: Int = 0 - @volatile private var restartsWithinTimeRangeTimestamp: Long = 0L - @volatile private var _mailbox: AnyRef = _ + @volatile + private[akka] var _remoteAddress: Option[InetSocketAddress] = None // only mutable to maintain identity across nodes + @volatile + private[akka] var _linkedActors: Option[ConcurrentHashMap[Uuid, ActorRef]] = None + @volatile + private[akka] var _supervisor: Option[ActorRef] = None + @volatile + private var isInInitialization = false + @volatile + private var maxNrOfRetriesCount: Int = 0 + @volatile + private var restartsWithinTimeRangeTimestamp: Long = 0L + @volatile + private var _mailbox: AnyRef = _ protected[akka] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) } @@ -680,36 +688,36 @@ class LocalActorRef private[akka]( if (isRunning) initializeActorInstance private[akka] def this(clazz: Class[_ <: Actor]) = this(Left(Some(clazz))) - private[akka] def this(factory: () => Actor) = this(Right(Some(factory))) + private[akka] def this(factory: () => Actor) = this(Right(Some(factory))) // used only for deserialization private[akka] def this(__uuid: Uuid, - __id: String, - __hostname: String, - __port: Int, - __isTransactor: Boolean, - __timeout: Long, - __receiveTimeout: Option[Long], - __lifeCycle: Option[LifeCycle], - __supervisor: Option[ActorRef], - __hotswap: Option[PartialFunction[Any, Unit]], - __factory: () => Actor) = { - this(__factory) - _uuid = __uuid - id = __id - homeAddress = (__hostname, __port) - isTransactor = __isTransactor - timeout = __timeout - receiveTimeout = __receiveTimeout - lifeCycle = __lifeCycle - _supervisor = __supervisor - hotswap = __hotswap - actorSelfFields._1.set(actor, this) - actorSelfFields._2.set(actor, Some(this)) - start - checkReceiveTimeout - ActorRegistry.register(this) - } + __id: String, + __hostname: String, + __port: Int, + __isTransactor: Boolean, + __timeout: Long, + __receiveTimeout: Option[Long], + __lifeCycle: Option[LifeCycle], + __supervisor: Option[ActorRef], + __hotswap: Option[PartialFunction[Any, Unit]], + __factory: () => Actor) = { + this(__factory) + _uuid = __uuid + id = __id + homeAddress = (__hostname, __port) + isTransactor = __isTransactor + timeout = __timeout + receiveTimeout = __receiveTimeout + lifeCycle = __lifeCycle + _supervisor = __supervisor + hotswap = __hotswap + actorSelfFields._1.set(actor, this) + actorSelfFields._2.set(actor, Some(this)) + start + checkReceiveTimeout + ActorRegistry.register(this) + } // ========= PUBLIC FUNCTIONS ========= @@ -730,7 +738,7 @@ class LocalActorRef private[akka]( if (!isBeingRestarted) { if (!isRunning) _dispatcher = md else throw new ActorInitializationException( - "Can not swap dispatcher for " + toString + " after it has been started") + "Can not swap dispatcher for " + toString + " after it has been started") } } @@ -828,8 +836,8 @@ class LocalActorRef private[akka]( actor.postStop ActorRegistry.unregister(this) if (isRemotingEnabled) { - if(remoteAddress.isDefined) - RemoteClientModule.unregister(remoteAddress.get, uuid) + if (remoteAddress.isDefined) + RemoteClientModule.unregister(remoteAddress.get, uuid) RemoteServerModule.unregister(this) } nullOutActorRefReferencesFor(actorInstance.get) @@ -872,7 +880,7 @@ class LocalActorRef private[akka]( *

* To be invoked from within the actor itself. */ - def startLink(actorRef: ActorRef):Unit = guard.withGuard { + def startLink(actorRef: ActorRef): Unit = guard.withGuard { try { link(actorRef) } finally { @@ -954,7 +962,7 @@ class LocalActorRef private[akka]( */ def mailbox: AnyRef = _mailbox - protected[akka] def mailbox_=(value: AnyRef):AnyRef = { _mailbox = value; value } + protected[akka] def mailbox_=(value: AnyRef): AnyRef = { _mailbox = value; value } /** * Shuts down and removes all linked actors. @@ -986,10 +994,10 @@ class LocalActorRef private[akka]( } protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( - message: Any, - timeout: Long, - senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { + message: Any, + timeout: Long, + senderOption: Option[ActorRef], + senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { joinTransaction(message) if (remoteAddress.isDefined && isRemotingEnabled) { @@ -999,7 +1007,7 @@ class LocalActorRef private[akka]( else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) } else { val future = if (senderFuture.isDefined) senderFuture.get - else new DefaultCompletableFuture[T](timeout) + else new DefaultCompletableFuture[T](timeout) val invocation = new MessageInvocation( this, message, senderOption, Some(future.asInstanceOf[CompletableFuture[Any]]), transactionSet.get) dispatcher dispatch invocation @@ -1021,7 +1029,8 @@ class LocalActorRef private[akka]( case e => Actor.log.error(e, "Could not invoke actor [%s]", this) throw e - } finally { + } + finally { currentMessage = null //TODO: Don't reset this, we might want to resend the message } } @@ -1047,15 +1056,15 @@ class LocalActorRef private[akka]( protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = { if (maxNrOfRetriesCount == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis // first time around - + val tooManyRestarts = if (maxNrOfRetries.isDefined) { - maxNrOfRetriesCount += 1 - maxNrOfRetriesCount > maxNrOfRetries.get - } else false + maxNrOfRetriesCount += 1 + maxNrOfRetriesCount > maxNrOfRetries.get + } else false val restartingHasExpired = if (withinTimeRange.isDefined) - (System.currentTimeMillis - restartsWithinTimeRangeTimestamp) > withinTimeRange.get - else false + (System.currentTimeMillis - restartsWithinTimeRangeTimestamp) > withinTimeRange.get + else false if (tooManyRestarts || restartingHasExpired) { val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) @@ -1088,8 +1097,8 @@ class LocalActorRef private[akka]( Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id) if (isProxyableDispatcher(failedActor)) restartProxyableDispatcher(failedActor, reason) - else restartActor(failedActor, reason) - + else restartActor(failedActor, reason) + _status = ActorRefStatus.RUNNING } } @@ -1149,27 +1158,27 @@ class LocalActorRef private[akka]( private def spawnButDoNotStart(clazz: Class[_ <: Actor]): ActorRef = Actor.actorOf(clazz.newInstance) private[this] def newActor: Actor = { - Actor.actorRefInCreation.withValue(Some(this)){ - isInInitialization = true - val actor = actorFactory match { - case Left(Some(clazz)) => - import ReflectiveAccess.{createInstance,noParams,noArgs} - createInstance(clazz.asInstanceOf[Class[_]], noParams, noArgs). - getOrElse(throw new ActorInitializationException( - "Could not instantiate Actor" + - "\nMake sure Actor is NOT defined inside a class/trait," + - "\nif so put it outside the class/trait, f.e. in a companion object," + - "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")) - case Right(Some(factory)) => - factory() - case _ => - throw new ActorInitializationException( - "Can't create Actor, no Actor class or factory function in scope") - } - if (actor eq null) throw new ActorInitializationException( - "Actor instance passed to ActorRef can not be 'null'") - isInInitialization = false - actor + Actor.actorRefInCreation.withValue(Some(this)) { + isInInitialization = true + val actor = actorFactory match { + case Left(Some(clazz)) => + import ReflectiveAccess.{ createInstance, noParams, noArgs } + createInstance(clazz.asInstanceOf[Class[_]], noParams, noArgs). + getOrElse(throw new ActorInitializationException( + "Could not instantiate Actor" + + "\nMake sure Actor is NOT defined inside a class/trait," + + "\nif so put it outside the class/trait, f.e. in a companion object," + + "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")) + case Right(Some(factory)) => + factory() + case _ => + throw new ActorInitializationException( + "Can't create Actor, no Actor class or factory function in scope") + } + if (actor eq null) throw new ActorInitializationException( + "Actor instance passed to ActorRef can not be 'null'") + isInInitialization = false + actor } } @@ -1181,15 +1190,15 @@ class LocalActorRef private[akka]( createNewTransactionSet } else oldTxSet Actor.log.trace("Joining transaction set [" + currentTxSet + - "];\n\tactor " + toString + - "\n\twith message [" + message + "]") + "];\n\tactor " + toString + + "\n\twith message [" + message + "]") val mtx = ThreadLocalTransaction.getThreadLocalTransaction if ((mtx eq null) || mtx.getStatus.isDead) currentTxSet.incParties else currentTxSet.incParties(mtx, 1) } private def dispatch[T](messageHandle: MessageInvocation) = { - Actor.log.trace("Invoking actor with message: %s\n",messageHandle) + Actor.log.trace("Invoking actor with message: %s\n", messageHandle) val message = messageHandle.message //serializeMessage(messageHandle.message) var topLevelTransaction = false val txSet: Option[CountDownCommitBarrier] = @@ -1198,7 +1207,7 @@ class LocalActorRef private[akka]( topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx if (isTransactor) { Actor.log.trace("Creating a new transaction set (top-level transaction)\n\tfor actor " + toString + - "\n\twith message " + messageHandle) + "\n\twith message " + messageHandle) Some(createNewTransactionSet) } else None } @@ -1223,7 +1232,8 @@ class LocalActorRef private[akka]( message, topLevelTransaction) case e: InterruptedException => {} // received message while actor is shutting down, ignore case e => handleExceptionInDispatch(e, message, topLevelTransaction) - } finally { + } + finally { clearTransaction if (topLevelTransaction) clearTransactionSet } @@ -1239,7 +1249,7 @@ class LocalActorRef private[akka]( "All linked actors have died permanently (they were all configured as TEMPORARY)" + "\n\tshutting down and unlinking supervisor actor as well [%s].", temporaryActor.id) - notifySupervisorWithMessage(UnlinkAndStop(this)) + notifySupervisorWithMessage(UnlinkAndStop(this)) } } @@ -1274,8 +1284,8 @@ class LocalActorRef private[akka]( // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client _supervisor.foreach { sup => if (sup.isShutdown) { // if supervisor is shut down, game over for all linked actors - shutdownLinkedActors - stop + shutdownLinkedActors + stop } else sup ! notification // else notify supervisor } } @@ -1287,8 +1297,8 @@ class LocalActorRef private[akka]( private def findActorSelfField(clazz: Class[_]): Tuple2[Field, Field] = { try { - val selfField = clazz.getDeclaredField("self") - val someSelfField = clazz.getDeclaredField("someSelf") + val selfField = clazz.getDeclaredField("self") + val someSelfField = clazz.getDeclaredField("someSelf") selfField.setAccessible(true) someSelfField.setAccessible(true) (selfField, someSelfField) @@ -1310,7 +1320,7 @@ class LocalActorRef private[akka]( checkReceiveTimeout } -/* + /* private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) { if (!message.isInstanceOf[String] && !message.isInstanceOf[Byte] && @@ -1359,7 +1369,7 @@ private[akka] case class RemoteActorRef private[akka] ( val port: Int, _timeout: Long, loader: Option[ClassLoader], - val actorType: ActorType = ActorType.ScalaActor) + val actorType: ActorType = ActorType.ScalaActor) extends ActorRef with ScalaActorRef { ensureRemotingEnabled @@ -1375,10 +1385,10 @@ private[akka] case class RemoteActorRef private[akka] ( message, senderOption, None, remoteAddress.get, timeout, true, this, None, actorType) def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( - message: Any, - timeout: Long, - senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { + message: Any, + timeout: Long, + senderOption: Option[ActorRef], + senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { val future = RemoteClientModule.send[T]( message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, actorType) if (future.isDefined) future.get @@ -1425,7 +1435,7 @@ private[akka] case class RemoteActorRef private[akka] ( def supervisor: Option[ActorRef] = unsupported def shutdownLinkedActors: Unit = unsupported protected[akka] def mailbox: AnyRef = unsupported - protected[akka] def mailbox_=(value: AnyRef):AnyRef = unsupported + protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported @@ -1467,26 +1477,27 @@ trait ActorRefShared { */ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => - /** - * Identifier for actor, does not have to be a unique one. Default is the 'uuid'. - *

- * This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote - * actor in RemoteServer etc.But also as the identifier for persistence, which means - * that you can use a custom name to be able to retrieve the "correct" persisted state - * upon restart, remote restart etc. - */ - def id: String + /** + * Identifier for actor, does not have to be a unique one. Default is the 'uuid'. + *

+ * This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote + * actor in RemoteServer etc.But also as the identifier for persistence, which means + * that you can use a custom name to be able to retrieve the "correct" persisted state + * upon restart, remote restart etc. + */ + def id: String - def id_=(id: String): Unit + def id_=(id: String): Unit - /** + /** * User overridable callback/setting. *

* Defines the life-cycle for a supervised actor. */ - @volatile var lifeCycle: Option[LifeCycle] = None + @volatile + var lifeCycle: Option[LifeCycle] = None - /** + /** * User overridable callback/setting. * *

@@ -1510,8 +1521,8 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => * trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError]) * */ - @volatile var trapExit: List[Class[_ <: Throwable]] = Nil - + @volatile + var trapExit: List[Class[_ <: Throwable]] = Nil /** * User overridable callback/setting. @@ -1527,8 +1538,8 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => * faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange)) * */ - @volatile var faultHandler: Option[FaultHandlingStrategy] = None - + @volatile + var faultHandler: Option[FaultHandlingStrategy] = None /** * The reference sender Actor of the last received message. @@ -1550,7 +1561,6 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => else msg.senderFuture } - /** * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. *

@@ -1587,7 +1597,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => if (isRunning) { val future = postMessageToMailboxAndCreateFutureResultWithTimeout[Any](message, timeout, sender, None) val isMessageJoinPoint = if (isTypedActorEnabled) TypedActorModule.resolveFutureIfMessageIsJoinPoint(message, future) - else false + else false try { future.await } catch { @@ -1598,10 +1608,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => if (future.exception.isDefined) throw future.exception.get else future.result } else throw new ActorInitializationException( - "Actor has not been started, you need to invoke 'actor.start' before using it") + "Actor has not been started, you need to invoke 'actor.start' before using it") } - /** * Sends a message asynchronously returns a future holding the eventual reply message. *

@@ -1637,7 +1646,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => *

* Throws an IllegalStateException if unable to determine what to reply to. */ - def reply(message: Any) = if(!reply_?(message)) throw new IllegalActorStateException( + def reply(message: Any) = if (!reply_?(message)) throw new IllegalActorStateException( "\n\tNo sender in scope, can't reply. " + "\n\tYou have probably: " + "\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." + @@ -1677,11 +1686,10 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => } else throw new IllegalActorStateException("No channel available") } - /** * Atomically create (from actor class) and start an actor. */ - def spawn[T <: Actor : Manifest]: ActorRef = + def spawn[T <: Actor: Manifest]: ActorRef = spawn(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) /** @@ -1689,10 +1697,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => */ def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = { ensureRemotingEnabled - spawnRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]],hostname,port) + spawnRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port) } - /** * Atomically create (from actor class), start and link an actor. */ @@ -1702,16 +1709,15 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => /** * Atomically create (from actor class), start, link and make an actor remote. */ - def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = { + def spawnLinkRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = { ensureRemotingEnabled - spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]],hostname,port) + spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port) } } - /** * Abstraction for unification of sender and senderFuture for later reply */ abstract class Channel[T] { def !(msg: T): Unit -} \ No newline at end of file +} diff --git a/akka-actor/src/main/scala/config/Config.scala b/akka-actor/src/main/scala/config/Config.scala index 7a4ac4be48..e97347754b 100644 --- a/akka-actor/src/main/scala/config/Config.scala +++ b/akka-actor/src/main/scala/config/Config.scala @@ -32,15 +32,36 @@ object Config { System.setProperty("org.multiverse.api.GlobalStmInstance.factorymethod", "org.multiverse.stms.alpha.AlphaStm.createFast") val HOME = { - val systemHome = System.getenv("AKKA_HOME") - if ((systemHome eq null) || systemHome.length == 0 || systemHome == ".") { - val optionHome = System.getProperty("akka.home", "") - if (optionHome.length != 0) Some(optionHome) - else None - } else Some(systemHome) + val envHome = System.getenv("AKKA_HOME") match { + case null | "" | "." => None + case value => Some(value) + } + + val systemHome = System.getProperty("akka.home") match { + case null | "" => None + case value => Some(value) + } + + envHome orElse systemHome } val config = { + + val confName = { + + val envConf = System.getenv("AKKA_MODE") match { + case null | "" => None + case value => Some(value) + } + + val systemConf = System.getProperty("akka.mode") match { + case null | "" => None + case value => Some(value) + } + + (envConf orElse systemConf).map("akka." + _ + ".conf").getOrElse("akka.conf") + } + if (System.getProperty("akka.config", "") != "") { val configFile = System.getProperty("akka.config", "") try { @@ -52,19 +73,9 @@ object Config { "\n\tdue to: " + e.toString) } Configgy.config - } else if (getClass.getClassLoader.getResource("akka.conf") ne null) { + } else if (HOME.isDefined) { try { - Configgy.configureFromResource("akka.conf", getClass.getClassLoader) - ConfigLogger.log.info("Config loaded from the application classpath.") - } catch { - case e: ParseException => throw new ConfigurationException( - "Can't load 'akka.conf' config file from application classpath," + - "\n\tdue to: " + e.toString) - } - Configgy.config - } else if (HOME.isDefined) { - try { - val configFile = HOME.getOrElse(throwNoAkkaHomeException) + "/config/akka.conf" + val configFile = HOME.getOrElse(throwNoAkkaHomeException) + "/config/" + confName Configgy.configure(configFile) ConfigLogger.log.info( "AKKA_HOME is defined as [%s], config loaded from [%s].", @@ -73,18 +84,28 @@ object Config { } catch { case e: ParseException => throw new ConfigurationException( "AKKA_HOME is defined as [" + HOME.get + "] " + - "\n\tbut the 'akka.conf' config file can not be found at [" + HOME.get + "/config/akka.conf]," + + "\n\tbut the 'akka.conf' config file can not be found at [" + HOME.get + "/config/"+ confName + "]," + + "\n\tdue to: " + e.toString) + } + Configgy.config + } else if (getClass.getClassLoader.getResource(confName) ne null) { + try { + Configgy.configureFromResource(confName, getClass.getClassLoader) + ConfigLogger.log.info("Config [%s] loaded from the application classpath.",confName) + } catch { + case e: ParseException => throw new ConfigurationException( + "Can't load '" + confName + "' config file from application classpath," + "\n\tdue to: " + e.toString) } Configgy.config } else { ConfigLogger.log.warning( - "\nCan't load 'akka.conf'." + - "\nOne of the three ways of locating the 'akka.conf' file needs to be defined:" + + "\nCan't load '" + confName + "'." + + "\nOne of the three ways of locating the '" + confName + "' file needs to be defined:" + "\n\t1. Define the '-Dakka.config=...' system property option." + - "\n\t2. Put the 'akka.conf' file on the classpath." + + "\n\t2. Put the '" + confName + "' file on the classpath." + "\n\t3. Define 'AKKA_HOME' environment variable pointing to the root of the Akka distribution." + - "\nI have no way of finding the 'akka.conf' configuration file." + + "\nI have no way of finding the '" + confName + "' configuration file." + "\nUsing default values everywhere.") CConfig.fromString("") // default empty config } @@ -92,7 +113,7 @@ object Config { val CONFIG_VERSION = config.getString("akka.version", VERSION) if (VERSION != CONFIG_VERSION) throw new ConfigurationException( - "Akka JAR version [" + VERSION + "] is different than the provided config ('akka.conf') version [" + CONFIG_VERSION + "]") + "Akka JAR version [" + VERSION + "] is different than the provided config version [" + CONFIG_VERSION + "]") val TIME_UNIT = config.getString("akka.time-unit", "seconds") diff --git a/akka-actor/src/main/scala/util/ListenerManagement.scala b/akka-actor/src/main/scala/util/ListenerManagement.scala index 7ad0f451f1..10104e119d 100644 --- a/akka-actor/src/main/scala/util/ListenerManagement.scala +++ b/akka-actor/src/main/scala/util/ListenerManagement.scala @@ -45,6 +45,11 @@ trait ListenerManagement extends Logging { */ def hasListeners: Boolean = !listeners.isEmpty + /** + * Checks if a specfic listener is registered. + */ + def hasListener(listener: ActorRef): Boolean = listeners.contains(listener) + protected def notifyListeners(message: => Any) { if (hasListeners) { val msg = message diff --git a/akka-actor/src/main/scala/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/util/ReflectiveAccess.scala index b0ec31bef2..ce6727e8e8 100644 --- a/akka-actor/src/main/scala/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/util/ReflectiveAccess.scala @@ -258,13 +258,8 @@ object ReflectiveAccess extends Logging { ctor.setAccessible(true) Some(ctor.newInstance(args: _*).asInstanceOf[T]) } catch { - case e: java.lang.reflect.InvocationTargetException => - e.printStackTrace - log.error(e.getCause, "Could not instantiate class [%s]", clazz.getName) - None case e: Exception => - e.printStackTrace - log.error(e.getCause, "Could not instantiate class [%s]", clazz.getName) + log.debug(e, "Could not instantiate class [%s] due to [%s]", clazz.getName, e.getMessage) None } @@ -280,13 +275,8 @@ object ReflectiveAccess extends Logging { ctor.setAccessible(true) Some(ctor.newInstance(args: _*).asInstanceOf[T]) } catch { - case e: java.lang.reflect.InvocationTargetException => - e.printStackTrace - log.error(e.getCause, "Could not instantiate class [%s] due to [%s]", fqn, e.toString) - None case e: Exception => - e.printStackTrace - log.error(e.getCause, "Could not instantiate class [%s] due to [%s]", fqn, e.toString) + log.debug(e, "Could not instantiate class [%s] due to [%s]", fqn, e.getMessage) None } @@ -297,13 +287,8 @@ object ReflectiveAccess extends Logging { instance.setAccessible(true) Option(instance.get(null).asInstanceOf[T]) } catch { - case e: java.lang.reflect.InvocationTargetException => - e.printStackTrace - log.error(e.getCause, "Could not instantiate class [%s]", fqn) - None case e: Exception => - e.printStackTrace - log.error(e.getCause, "Could not instantiate class [%s]", fqn) + log.debug(e, "Could not get object [%s] due to [%s]", fqn, e.getMessage) None } diff --git a/akka-http/src/main/scala/AkkaBroadcaster.scala b/akka-http/src/main/scala/AkkaBroadcaster.scala index ca5abc6f1d..8aae04bc86 100644 --- a/akka-http/src/main/scala/AkkaBroadcaster.scala +++ b/akka-http/src/main/scala/AkkaBroadcaster.scala @@ -5,23 +5,27 @@ package se.scalablesolutions.akka.comet import org.atmosphere.cpr.{AtmosphereResourceEvent, AtmosphereResource} + import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.dispatch.Dispatchers +import org.atmosphere.jersey.util.JerseyBroadcasterUtil object AkkaBroadcaster { val broadcasterDispatcher = Dispatchers.fromConfig("akka.rest.comet-dispatcher") + + type Event = AtmosphereResourceEvent[_,_] + type Resource = AtmosphereResource[_,_] } -class AkkaBroadcaster extends org.atmosphere.jersey.JerseyBroadcaster { +class AkkaBroadcaster extends org.atmosphere.jersey.util.JerseySimpleBroadcaster { import AkkaBroadcaster._ - name = classOf[AkkaBroadcaster].getName //FIXME should be supervised - val caster = actorOf(new Actor { + lazy val caster = actorOf(new Actor { self.dispatcher = broadcasterDispatcher def receive = { - case f : Function0[_] => f() + case (r: Resource,e: Event) => JerseyBroadcasterUtil.broadcast(r,e) } }).start @@ -30,7 +34,7 @@ class AkkaBroadcaster extends org.atmosphere.jersey.JerseyBroadcaster { caster.stop } - protected override def broadcast(r : AtmosphereResource[_,_], e : AtmosphereResourceEvent[_,_]) = { - caster ! (() => super.broadcast(r,e)) + protected override def broadcast(r: Resource, e : Event) { + caster ! ((r,e)) } -} +} \ No newline at end of file diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 6bc95d3271..c21aba9267 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -91,7 +91,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // Versions // ------------------------------------------------------------------------------------------------------------------- - lazy val ATMO_VERSION = "0.6.1" + lazy val ATMO_VERSION = "0.6.2" lazy val CAMEL_VERSION = "2.4.0" lazy val CASSANDRA_VERSION = "0.6.1" lazy val DISPATCH_VERSION = "0.7.4"