Merge pull request #538 from akka/wip-replace-receive-√

Wip replace receive √
This commit is contained in:
viktorklang 2012-06-18 01:38:20 -07:00
commit 8d6fed6866
6 changed files with 41 additions and 53 deletions

View file

@ -7,7 +7,6 @@ package akka.actor
import akka.AkkaException import akka.AkkaException
import scala.reflect.BeanProperty import scala.reflect.BeanProperty
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import scala.collection.immutable.Stack
import java.util.regex.Pattern import java.util.regex.Pattern
/** /**
@ -279,18 +278,14 @@ trait Actor {
*/ */
protected[akka] implicit val context: ActorContext = { protected[akka] implicit val context: ActorContext = {
val contextStack = ActorCell.contextStack.get val contextStack = ActorCell.contextStack.get
if ((contextStack.isEmpty) || (contextStack.head eq null))
def noContextError =
throw new ActorInitializationException( throw new ActorInitializationException(
"\n\tYou cannot create an instance of [" + getClass.getName + "] explicitly using the constructor (new)." + "\n\tYou cannot create an instance of [" + getClass.getName + "] explicitly using the constructor (new)." +
"\n\tYou have to use one of the factory methods to create a new actor. Either use:" + "\n\tYou have to use one of the factory methods to create a new actor. Either use:" +
"\n\t\t'val actor = context.actorOf(Props[MyActor])' (to create a supervised child actor from within an actor), or" + "\n\t\t'val actor = context.actorOf(Props[MyActor])' (to create a supervised child actor from within an actor), or" +
"\n\t\t'val actor = system.actorOf(Props(new MyActor(..)))' (to create a top level actor from the ActorSystem)") "\n\t\t'val actor = system.actorOf(Props(new MyActor(..)))' (to create a top level actor from the ActorSystem)")
if (contextStack.isEmpty) noContextError
val c = contextStack.head val c = contextStack.head
if (c eq null) noContextError ActorCell.contextStack.set(null :: contextStack)
ActorCell.contextStack.set(contextStack.push(null))
c c
} }

View file

@ -13,7 +13,7 @@ import akka.japi.Procedure
import java.io.{ NotSerializableException, ObjectOutputStream } import java.io.{ NotSerializableException, ObjectOutputStream }
import akka.serialization.SerializationExtension import akka.serialization.SerializationExtension
import akka.event.Logging.LogEventException import akka.event.Logging.LogEventException
import collection.immutable.{ TreeSet, Stack, TreeMap } import collection.immutable.{ TreeSet, TreeMap }
import akka.util.{ Unsafe, Duration, Helpers, NonFatal } import akka.util.{ Unsafe, Duration, Helpers, NonFatal }
//TODO: everything here for current compatibility - could be limited more //TODO: everything here for current compatibility - could be limited more
@ -173,8 +173,8 @@ trait UntypedActorContext extends ActorContext {
* for! (waves hand) * for! (waves hand)
*/ */
private[akka] object ActorCell { private[akka] object ActorCell {
val contextStack = new ThreadLocal[Stack[ActorContext]] { val contextStack = new ThreadLocal[List[ActorContext]] {
override def initialValue = Stack[ActorContext]() override def initialValue: List[ActorContext] = Nil
} }
final val emptyCancellable: Cancellable = new Cancellable { final val emptyCancellable: Cancellable = new Cancellable {
@ -184,7 +184,7 @@ private[akka] object ActorCell {
final val emptyReceiveTimeoutData: (Long, Cancellable) = (-1, emptyCancellable) final val emptyReceiveTimeoutData: (Long, Cancellable) = (-1, emptyCancellable)
final val behaviorStackPlaceHolder: Stack[Actor.Receive] = Stack.empty.push(Actor.emptyBehavior) final val emptyBehaviorStack: List[Actor.Receive] = Nil
final val emptyActorRefSet: Set[ActorRef] = TreeSet.empty final val emptyActorRefSet: Set[ActorRef] = TreeSet.empty
@ -408,7 +408,7 @@ private[akka] class ActorCell(
var currentMessage: Envelope = _ var currentMessage: Envelope = _
var actor: Actor = _ var actor: Actor = _
private var behaviorStack: Stack[Actor.Receive] = Stack.empty private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack
@volatile var _mailboxDoNotCallMeDirectly: Mailbox = _ //This must be volatile since it isn't protected by the mailbox status @volatile var _mailboxDoNotCallMeDirectly: Mailbox = _ //This must be volatile since it isn't protected by the mailbox status
var nextNameSequence: Long = 0 var nextNameSequence: Long = 0
var watching: Set[ActorRef] = emptyActorRefSet var watching: Set[ActorRef] = emptyActorRefSet
@ -511,25 +511,21 @@ private[akka] class ActorCell(
//This method is in charge of setting up the contextStack and create a new instance of the Actor //This method is in charge of setting up the contextStack and create a new instance of the Actor
protected def newActor(): Actor = { protected def newActor(): Actor = {
contextStack.set(contextStack.get.push(this)) contextStack.set(this :: contextStack.get)
try { try {
import ActorCell.behaviorStackPlaceHolder behaviorStack = emptyBehaviorStack
behaviorStack = behaviorStackPlaceHolder
val instance = props.creator.apply() val instance = props.creator.apply()
if (instance eq null) if (instance eq null)
throw new ActorInitializationException(self, "Actor instance passed to actorOf can't be 'null'") throw new ActorInitializationException(self, "Actor instance passed to actorOf can't be 'null'")
behaviorStack = behaviorStack match { // If no becomes were issued, the actors behavior is its receive method
case `behaviorStackPlaceHolder` Stack.empty.push(instance.receive) behaviorStack = if (behaviorStack.isEmpty) instance.receive :: behaviorStack else behaviorStack
case newBehaviors Stack.empty.push(instance.receive).pushAll(newBehaviors.reverse.drop(1))
}
instance instance
} finally { } finally {
val stackAfter = contextStack.get val stackAfter = contextStack.get
if (stackAfter.nonEmpty) if (stackAfter.nonEmpty)
contextStack.set(if (stackAfter.head eq null) stackAfter.pop.pop else stackAfter.pop) // pop null marker plus our context contextStack.set(if (stackAfter.head eq null) stackAfter.tail.tail else stackAfter.tail) // pop null marker plus our context
} }
} }
@ -683,10 +679,8 @@ private[akka] class ActorCell(
} }
} }
def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit = { def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit =
if (discardOld) unbecome() behaviorStack = behavior :: (if (discardOld && behaviorStack.nonEmpty) behaviorStack.tail else behaviorStack)
behaviorStack = behaviorStack.push(behavior)
}
/** /**
* UntypedActorContext impl * UntypedActorContext impl
@ -701,8 +695,9 @@ private[akka] class ActorCell(
def unbecome(): Unit = { def unbecome(): Unit = {
val original = behaviorStack val original = behaviorStack
val popped = original.pop behaviorStack =
behaviorStack = if (popped.isEmpty) original else popped if (original.isEmpty || original.tail.isEmpty) actor.receive :: emptyBehaviorStack
else original.tail
} }
def autoReceiveMessage(msg: Envelope): Unit = { def autoReceiveMessage(msg: Envelope): Unit = {
@ -761,7 +756,7 @@ private[akka] class ActorCell(
if (system.settings.DebugLifecycle) if (system.settings.DebugLifecycle)
system.eventStream.publish(Debug(self.path.toString, clazz(a), "stopped")) system.eventStream.publish(Debug(self.path.toString, clazz(a), "stopped"))
} finally { } finally {
behaviorStack = behaviorStackPlaceHolder behaviorStack = emptyBehaviorStack
clearActorFields(a) clearActorFields(a)
actor = null actor = null
} }

View file

@ -13,7 +13,6 @@ import java.io.Closeable
import akka.dispatch.Await.{ Awaitable, CanAwait } import akka.dispatch.Await.{ Awaitable, CanAwait }
import akka.util._ import akka.util._
import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap } import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap }
import collection.immutable.Stack
import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException } import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException }
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
@ -685,8 +684,8 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
final class TerminationCallbacks extends Runnable with Awaitable[Unit] { final class TerminationCallbacks extends Runnable with Awaitable[Unit] {
private val lock = new ReentrantGuard private val lock = new ReentrantGuard
private var callbacks: Stack[Runnable] = _ //non-volatile since guarded by the lock private var callbacks: List[Runnable] = _ //non-volatile since guarded by the lock
lock withGuard { callbacks = Stack.empty[Runnable] } lock withGuard { callbacks = Nil }
private val latch = new CountDownLatch(1) private val latch = new CountDownLatch(1)
@ -695,17 +694,17 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
case 0 throw new RejectedExecutionException("Must be called prior to system shutdown.") case 0 throw new RejectedExecutionException("Must be called prior to system shutdown.")
case _ lock withGuard { case _ lock withGuard {
if (latch.getCount == 0) throw new RejectedExecutionException("Must be called prior to system shutdown.") if (latch.getCount == 0) throw new RejectedExecutionException("Must be called prior to system shutdown.")
else callbacks = callbacks.push(callback) else callbacks ::= callback
} }
} }
} }
final def run(): Unit = lock withGuard { final def run(): Unit = lock withGuard {
@tailrec def runNext(c: Stack[Runnable]): Stack[Runnable] = c.headOption match { @tailrec def runNext(c: List[Runnable]): List[Runnable] = c match {
case None Stack.empty[Runnable] case Nil Nil
case Some(callback) case callback :: rest
try callback.run() catch { case e log.error(e, "Failed to run termination callback, due to [{}]", e.getMessage) } try callback.run() catch { case NonFatal(e) log.error(e, "Failed to run termination callback, due to [{}]", e.getMessage) }
runNext(c.pop) runNext(rest)
} }
try { callbacks = runNext(callbacks) } finally latch.countDown() try { callbacks = runNext(callbacks) } finally latch.countDown()
} }

View file

@ -6,7 +6,6 @@ package akka.actor
import akka.dispatch._ import akka.dispatch._
import akka.japi.Creator import akka.japi.Creator
import collection.immutable.Stack
import akka.routing._ import akka.routing._
/** /**

View file

@ -5,9 +5,7 @@
package akka.testkit package akka.testkit
import akka.actor._ import akka.actor._
import akka.util.Duration
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import scala.collection.immutable.Stack
import akka.dispatch._ import akka.dispatch._
import akka.pattern.ask import akka.pattern.ask

View file

@ -486,19 +486,21 @@ trait TestKitBase {
@tailrec @tailrec
def doit(acc: List[T], count: Int): List[T] = { def doit(acc: List[T], count: Int): List[T] = {
if (count >= messages) return acc.reverse if (count >= messages) acc.reverse
receiveOne((stop - now) min idle) else {
lastMessage match { receiveOne((stop - now) min idle)
case NullMessage lastMessage match {
lastMessage = msg case NullMessage
acc.reverse lastMessage = msg
case RealMessage(o, _) if (f isDefinedAt o) acc.reverse
msg = lastMessage case RealMessage(o, _) if (f isDefinedAt o)
doit(f(o) :: acc, count + 1) msg = lastMessage
case RealMessage(o, _) doit(f(o) :: acc, count + 1)
queue.offerFirst(lastMessage) case RealMessage(o, _)
lastMessage = msg queue.offerFirst(lastMessage)
acc.reverse lastMessage = msg
acc.reverse
}
} }
} }