Fixed problem with ordering when invoking self.start from within Actor

This commit is contained in:
Jonas Bonér 2010-05-19 21:08:02 +02:00
parent b36dc5cf79
commit 656e65cffa
3 changed files with 14 additions and 9 deletions

View file

@ -228,11 +228,12 @@ object Actor extends Logging {
def spawn(body: => Unit): Unit = {
case object Spawn
actorOf(new Actor() {
self.start
self ! Spawn
def receive = {
case Spawn => body; self.stop
}
}).start
})
}
}

View file

@ -5,7 +5,7 @@
package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.dispatch._
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.config.Config.config
import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.stm.Transaction.Global._
@ -716,6 +716,9 @@ sealed class LocalActorRef private[akka](
if (isShutdown) throw new ActorStartException(
"Can't restart an actor that has been shut down with 'stop' or 'exit'")
if (!isRunning) {
dispatcher.register(this)
dispatcher.start
_isRunning = true
if (!isInInitialization) initializeActorInstance
else runActorInitialization = true
}
@ -979,7 +982,11 @@ sealed class LocalActorRef private[akka](
/**
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods.
*/
protected[akka] def invoke(messageHandle: MessageInvocation) = actor.synchronized {
protected[akka] def invoke(messageHandle: MessageInvocation): Unit = actor.synchronized {
if (isShutdown) {
Actor.log.warning("Actor [%s] is shut down, ignoring message [%s]", toString, messageHandle)
return
}
try {
sender = messageHandle.sender
senderFuture = messageHandle.senderFuture
@ -1172,15 +1179,12 @@ sealed class LocalActorRef private[akka](
}
}
private def initializeActorInstance = if (!isRunning) {
dispatcher.register(this)
dispatcher.start
private def initializeActorInstance = {
actor.init // run actor init and initTransactionalState callbacks
actor.initTransactionalState
Actor.log.debug("[%s] has started", toString)
ActorRegistry.register(this)
if (id == "N/A") id = actorClass.getName // if no name set, then use default name (class name)
_isRunning = true
}
private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) {

View file

@ -14,7 +14,6 @@ import Actor._
*/
class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustMatchers {
class SlowActor(finishedCounter: CountDownLatch) extends Actor {
self.dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
self.id = "SlowActor"
def receive = {
@ -26,7 +25,6 @@ class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustM
}
class FastActor(finishedCounter: CountDownLatch) extends Actor {
self.dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
self.id = "FastActor"
def receive = {
@ -55,6 +53,8 @@ class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustM
// now assert that f is finished while s is still busy
fFinished.await
assert(sFinished.getCount > 0)
sFinished.await
assert(sFinished.getCount === 0)
f.stop
s.stop
}