Almost there... ActorRefSpec still has a failing test

This commit is contained in:
Viktor Klang 2011-09-20 18:34:21 +02:00
parent 48deb311fc
commit 9007b6e847
8 changed files with 189 additions and 101 deletions

View file

@ -11,11 +11,11 @@ import akka.testkit._
import akka.util.duration._
import akka.testkit.Testing.sleepFor
import akka.config.Supervision.{ OneForOnePermanentStrategy }
import akka.dispatch.Future
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import java.lang.IllegalStateException
import akka.util.ReflectiveAccess
import akka.actor.Actor.actorOf
import akka.dispatch.{ DefaultPromise, Promise, Future }
object ActorRefSpec {
@ -115,6 +115,23 @@ object ActorRefSpec {
class ActorRefSpec extends WordSpec with MustMatchers {
import akka.actor.ActorRefSpec._
def promiseIntercept(f: Actor)(to: Promise[Actor]): Actor = try {
val r = f
to.completeWithResult(r)
r
} catch {
case e
to.completeWithException(e)
throw e
}
def wrap[T](f: Promise[Actor] T): T = {
val result = new DefaultPromise[Actor](10 * 60 * 1000)
val r = f(result)
result.get
r
}
"An ActorRef" must {
"not allow Actors to be created outside of an actorOf" in {
@ -123,10 +140,11 @@ class ActorRefSpec extends WordSpec with MustMatchers {
}
intercept[akka.actor.ActorInitializationException] {
wrap(result
actorOf(new Actor {
val nested = new Actor { def receive = { case _ } }
val nested = promiseIntercept(new Actor { def receive = { case _ } })(result)
def receive = { case _ }
})
}))
}
def contextStackMustBeEmpty = ActorCell.contextStack.get.headOption must be === None
@ -134,69 +152,80 @@ class ActorRefSpec extends WordSpec with MustMatchers {
contextStackMustBeEmpty
intercept[akka.actor.ActorInitializationException] {
actorOf(new FailingOuterActor(actorOf(new InnerActor)))
wrap(result
actorOf(promiseIntercept(new FailingOuterActor(actorOf(new InnerActor)))(result)))
}
contextStackMustBeEmpty
intercept[akka.actor.ActorInitializationException] {
actorOf(new OuterActor(actorOf(new FailingInnerActor)))
wrap(result
actorOf(new OuterActor(actorOf(promiseIntercept(new FailingInnerActor)(result)))))
}
contextStackMustBeEmpty
intercept[akka.actor.ActorInitializationException] {
actorOf(new FailingInheritingOuterActor(actorOf(new InnerActor)))
wrap(result
actorOf(promiseIntercept(new FailingInheritingOuterActor(actorOf(new InnerActor)))(result)))
}
contextStackMustBeEmpty
intercept[akka.actor.ActorInitializationException] {
actorOf(new FailingOuterActor(actorOf(new FailingInheritingInnerActor)))
wrap(result
actorOf(new FailingOuterActor(actorOf(promiseIntercept(new FailingInheritingInnerActor)(result)))))
}
contextStackMustBeEmpty
intercept[akka.actor.ActorInitializationException] {
actorOf(new FailingInheritingOuterActor(actorOf(new FailingInheritingInnerActor)))
wrap(result
actorOf(new FailingInheritingOuterActor(actorOf(promiseIntercept(new FailingInheritingInnerActor)(result)))))
}
contextStackMustBeEmpty
intercept[akka.actor.ActorInitializationException] {
actorOf(new FailingInheritingOuterActor(actorOf(new FailingInnerActor)))
wrap(result
actorOf(new FailingInheritingOuterActor(actorOf(promiseIntercept(new FailingInnerActor)(result)))))
}
contextStackMustBeEmpty
intercept[akka.actor.ActorInitializationException] {
wrap(result
actorOf(new OuterActor(actorOf(new InnerActor {
val a = new InnerActor
})))
val a = promiseIntercept(new InnerActor)(result)
}))))
}
contextStackMustBeEmpty
intercept[akka.actor.ActorInitializationException] {
actorOf(new FailingOuterActor(actorOf(new FailingInheritingInnerActor)))
wrap(result
actorOf(new FailingOuterActor(actorOf(promiseIntercept(new FailingInheritingInnerActor)(result)))))
}
contextStackMustBeEmpty
intercept[akka.actor.ActorInitializationException] {
actorOf(new OuterActor(actorOf(new FailingInheritingInnerActor)))
wrap(result
actorOf(new OuterActor(actorOf(promiseIntercept(new FailingInheritingInnerActor)(result)))))
}
contextStackMustBeEmpty
intercept[akka.actor.ActorInitializationException] {
actorOf(new OuterActor(actorOf({ new InnerActor; new InnerActor })))
wrap(result
actorOf(new OuterActor(actorOf(promiseIntercept({ new InnerActor; new InnerActor })(result)))))
}
contextStackMustBeEmpty
(intercept[java.lang.IllegalStateException] {
actorOf(new OuterActor(actorOf({ throw new IllegalStateException("Ur state be b0rked"); new InnerActor })))
wrap(result
actorOf(new OuterActor(actorOf(promiseIntercept({ throw new IllegalStateException("Ur state be b0rked"); new InnerActor })(result)))))
}).getMessage must be === "Ur state be b0rked"
contextStackMustBeEmpty

View file

@ -56,9 +56,8 @@ case class HotSwap(code: ActorRef ⇒ Actor.Receive, discardOld: Boolean = true)
def this(code: akka.japi.Function[ActorRef, Procedure[Any]]) = this(code, true)
}
case object Init extends AutoReceivedMessage with LifeCycleMessage
case class Death(deceased: ActorRef, cause: Throwable, recoverable: Boolean) extends AutoReceivedMessage with LifeCycleMessage
case class Restart(reason: Throwable) extends AutoReceivedMessage with LifeCycleMessage
case class Crash(reason: Throwable) extends AutoReceivedMessage
case object RevertHotSwap extends AutoReceivedMessage
@ -657,14 +656,13 @@ trait Actor {
*/
msg match {
case Init reply(()); false //All gud nao FIXME remove reply when we can have fully async init
case HotSwap(code, discardOld) become(code(self), discardOld); false
case RevertHotSwap unbecome(); false
case d: Death context.handleDeath(d); false
case Link(child) self.link(child); false
case Unlink(child) self.unlink(child); false
case UnlinkAndStop(child) self.unlink(child); child.stop(); false
case Restart(reason) throw reason
case Crash(reason) throw reason
case Kill throw new ActorKilledException("Kill")
case PoisonPill
val ch = channel

View file

@ -49,12 +49,6 @@ private[akka] trait ActorContext {
}
private[akka] object ActorCell {
sealed trait Status
object Status {
object Running extends Status
object Shutdown extends Status
}
val contextStack = new ThreadLocal[Stack[ActorContext]] {
override def initialValue = Stack[ActorContext]()
}
@ -72,7 +66,7 @@ private[akka] class ActorCell(
val guard = new ReentrantGuard // TODO: remove this last synchronization point
@volatile
var status: Status = Status.Running
var terminated = false
@volatile
var mailbox: AnyRef = _
@ -111,14 +105,13 @@ private[akka] class ActorCell(
def dispatcher: MessageDispatcher = props.dispatcher
def isRunning: Boolean = status == Status.Running
def isShutdown: Boolean = status == Status.Shutdown
def isRunning: Boolean = !terminated
def isShutdown: Boolean = terminated
def start(): Unit = {
if (isShutdown) throw new ActorStartException("Can't start an actor that has been stopped")
if (props.supervisor.isDefined) props.supervisor.get.link(self)
dispatcher.attach(this)
Actor.registry.register(self)
dispatcher.systemDispatch(SystemMessageInvocation(this, Create, NullChannel))
}
def newActor(restart: Boolean): Actor = {
@ -153,33 +146,8 @@ private[akka] class ActorCell(
def resume(): Unit = dispatcher.resume(this)
private[akka] def stop(): Unit = guard.withGuard {
if (isRunning) {
receiveTimeout = None
cancelReceiveTimeout
Actor.registry.unregister(self)
status = Status.Shutdown
dispatcher.detach(this)
try {
val a = actor.get
if (Actor.debugLifecycle) EventHandler.debug(a, "stopping")
if (a ne null) a.postStop()
{ //Stop supervised actors
val i = _linkedActors.values.iterator
while (i.hasNext) {
i.next.stop()
i.remove()
}
}
} finally {
//if (supervisor.isDefined) supervisor.get ! Death(self, new ActorKilledException("Stopped"), false)
currentMessage = null
clearActorContext()
}
}
}
private[akka] def stop(): Unit =
if (!terminated) dispatcher.systemDispatch(SystemMessageInvocation(this, Terminate, NullChannel))
def link(actorRef: ActorRef): ActorRef = {
guard.withGuard {
@ -252,7 +220,79 @@ private[akka] class ActorCell(
case msg msg.channel
}
def systemInvoke(envelope: SystemMessageInvocation): Unit = {
var isTerminated = terminated
def create(recreation: Boolean): Unit = try {
actor.get() match {
case null
val created = newActor(restart = false)
actor.set(created)
created.preStart()
Actor.registry.register(self)
case instance if recreation
restart(new Exception("Restart commanded"), None, None)
case _
}
} catch {
case e
e.printStackTrace()
envelope.channel.sendException(e)
if (supervisor.isDefined) supervisor.get ! Death(self, e, false) else throw e
}
def suspend(): Unit = dispatcher suspend this
def resume(): Unit = dispatcher resume this
def terminate(): Unit = {
receiveTimeout = None
cancelReceiveTimeout
Actor.registry.unregister(self)
isTerminated = true
dispatcher.detach(this)
try {
val a = actor.get
if (Actor.debugLifecycle) EventHandler.debug(a, "stopping")
if (a ne null) a.postStop()
{ //Stop supervised actors
val i = _linkedActors.values.iterator
while (i.hasNext) {
i.next.stop()
i.remove()
}
}
} finally {
if (supervisor.isDefined) supervisor.get ! Death(self, new ActorKilledException("Stopped"), false)
currentMessage = null
clearActorContext()
}
}
guard.lock.lock()
try {
if (!isTerminated) {
envelope.message match {
case Create create(recreation = false)
case Recreate create(recreation = true)
case Suspend suspend()
case Resume resume()
case Terminate terminate()
}
}
} catch {
case e //Should we really catch everything here?
EventHandler.error(e, actor.get(), e.getMessage)
throw e
} finally {
terminated = isTerminated
guard.lock.unlock()
}
}
def invoke(messageHandle: MessageInvocation): Unit = {
var isTerminated = terminated
guard.lock.lock()
try {
if (!isShutdown) {
@ -261,17 +301,7 @@ private[akka] class ActorCell(
try {
cancelReceiveTimeout() // FIXME: leave this here?
val a = actor.get() match {
case null
val created = newActor(restart = false)
actor.set(created)
if (Actor.debugLifecycle) EventHandler.debug(created, "started")
created.preStart()
created
case instance instance
}
a.apply(messageHandle.message)
actor.get().apply(messageHandle.message)
currentMessage = null // reset current message after successful invocation
} catch {
case e
@ -294,9 +324,11 @@ private[akka] class ActorCell(
throw e
}
} else {
messageHandle.channel sendException new ActorKilledException("Actor has been stopped")
// throwing away message if actor is shut down, no use throwing an exception in receiving actor's thread, isShutdown is enforced on caller side
}
} finally {
terminated = isTerminated
guard.lock.unlock()
}
}

View file

@ -92,9 +92,19 @@ class Dispatcher(
protected[akka] def dispatch(invocation: MessageInvocation) = {
val mbox = getMailbox(invocation.receiver)
if (mbox ne null) {
mbox enqueue invocation
registerForExecution(mbox)
}
}
protected[akka] def systemDispatch(invocation: SystemMessageInvocation) = {
val mbox = getMailbox(invocation.receiver)
if (mbox ne null) {
mbox systemEnqueue invocation
registerForExecution(mbox)
}
}
protected[akka] def executeTask(invocation: TaskInvocation): Unit = if (active.isOn) {
try executorService.get() execute invocation
@ -142,7 +152,7 @@ class Dispatcher(
protected[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
if (mbox.dispatcherLock.tryLock()) {
if (active.isOn && !mbox.suspended.locked) { //If the dispatcher is active and the actor not suspended
if (active.isOn && (!mbox.suspended.locked || !mbox.systemMessages.isEmpty)) { //If the dispatcher is active and the actor not suspended
try {
executorService.get() execute mbox
} catch {
@ -196,7 +206,7 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue ⇒
case ie: InterruptedException Thread.currentThread().interrupt() //Restore interrupt
} finally {
dispatcherLock.unlock()
if (!self.isEmpty)
if (!self.isEmpty || !self.systemMessages.isEmpty)
dispatcher.reRegisterForExecution(this)
}
}
@ -207,6 +217,7 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue ⇒
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
*/
final def processMailbox() {
processAllSystemMessages()
if (!self.suspended.locked) {
var nextMessage = self.dequeue
if (nextMessage ne null) { //If we have a message
@ -219,6 +230,7 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue ⇒
else 0
do {
nextMessage.invoke
processAllSystemMessages()
nextMessage =
if (self.suspended.locked) {
null // If we are suspended, abort

View file

@ -16,12 +16,24 @@ class MessageQueueAppendFailedException(message: String, cause: Throwable = null
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait MessageQueue {
val dispatcherLock = new SimpleLock
val suspended = new SimpleLock
val dispatcherLock = new SimpleLock(startLocked = false)
val suspended = new SimpleLock(startLocked = false) //(startLocked = true)
val systemMessages = new ConcurrentLinkedQueue[SystemMessageInvocation]()
def enqueue(handle: MessageInvocation)
def dequeue(): MessageInvocation
def systemEnqueue(handle: SystemMessageInvocation): Unit = systemMessages.offer(handle)
def systemDequeue(): SystemMessageInvocation = systemMessages.poll()
def size: Int
def isEmpty: Boolean
def processAllSystemMessages(): Unit = {
var nextMessage = systemDequeue()
while (nextMessage ne null) {
nextMessage.invoke()
nextMessage = systemDequeue()
}
}
}
/**

View file

@ -16,14 +16,22 @@ import akka.actor._
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
final case class MessageInvocation(val receiver: ActorCell,
val message: Any,
val channel: UntypedChannel) {
final case class MessageInvocation(val receiver: ActorCell, val message: Any, val channel: UntypedChannel) {
if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null")
final def invoke() {
receiver invoke this
final def invoke() { receiver invoke this }
}
sealed trait SystemMessage
case object Create extends SystemMessage
case object Recreate extends SystemMessage
case object Suspend extends SystemMessage
case object Resume extends SystemMessage
case object Terminate extends SystemMessage
final case class SystemMessageInvocation(val receiver: ActorCell, val message: SystemMessage, val channel: UntypedChannel) {
if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null")
final def invoke() { receiver systemInvoke this }
}
final case class TaskInvocation(function: () Unit, cleanup: () Unit) extends Runnable {
@ -73,15 +81,12 @@ abstract class MessageDispatcher extends Serializable {
* Attaches the specified actor instance to this dispatcher
*/
final def attach(actor: ActorCell): Unit = {
val promise = new ActorPromise(Timeout.never)(this)
guard.lock.lock()
try {
register(actor)
dispatchMessage(new MessageInvocation(actor, Init, promise))
} finally {
guard.lock.unlock()
}
promise.get
}
/**
@ -226,6 +231,11 @@ abstract class MessageDispatcher extends Serializable {
*/
protected[akka] def dispatch(invocation: MessageInvocation)
/**
* Will be called when the dispatcher is to queue an invocation for execution
*/
protected[akka] def systemDispatch(invocation: SystemMessageInvocation)
protected[akka] def executeTask(invocation: TaskInvocation)
/**

View file

@ -55,9 +55,7 @@ class ReadWriteGuard {
* A very simple lock that uses CCAS (Compare Compare-And-Swap)
* Does not keep track of the owner and isn't Reentrant, so don't nest and try to stick to the if*-methods
*/
class SimpleLock {
val acquired = new AtomicBoolean(false)
class SimpleLock(startLocked: Boolean = false) extends AtomicBoolean(startLocked) {
def ifPossible(perform: () Unit): Boolean = {
if (tryLock()) {
try {
@ -89,20 +87,13 @@ class SimpleLock {
} else None
}
def tryLock() = {
if (acquired.get) false
else acquired.compareAndSet(false, true)
}
def tryLock() = compareAndSet(false, true)
def tryUnlock() = {
acquired.compareAndSet(true, false)
}
def tryUnlock() = compareAndSet(true, false)
def locked = acquired.get
def locked = get
def unlock() {
acquired.set(false)
}
def unlock(): Unit = set(false)
}
/**

View file

@ -4,7 +4,6 @@
package akka.testkit
import akka.event.EventHandler
import akka.dispatch.{ MessageDispatcher, MessageInvocation, TaskInvocation, Promise, ActorPromise }
import java.util.concurrent.locks.ReentrantLock
import java.util.LinkedList
import java.util.concurrent.RejectedExecutionException
@ -12,6 +11,7 @@ import akka.util.Switch
import java.lang.ref.WeakReference
import scala.annotation.tailrec
import akka.actor.ActorCell
import akka.dispatch._
/*
* Locking rules:
@ -137,6 +137,10 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings:
override def mailboxIsEmpty(actor: ActorCell): Boolean = getMailbox(actor).queue.isEmpty
protected[akka] override def systemDispatch(handle: SystemMessageInvocation) {
handle.invoke() //Roland, look at me
}
protected[akka] override def dispatch(handle: MessageInvocation) {
val mbox = getMailbox(handle.receiver)
val queue = mbox.queue