diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index dcca865b24..6eb2292a09 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -11,11 +11,11 @@ import akka.testkit._ import akka.util.duration._ import akka.testkit.Testing.sleepFor import akka.config.Supervision.{ OneForOnePermanentStrategy } -import akka.dispatch.Future import java.util.concurrent.{ TimeUnit, CountDownLatch } import java.lang.IllegalStateException import akka.util.ReflectiveAccess import akka.actor.Actor.actorOf +import akka.dispatch.{ DefaultPromise, Promise, Future } object ActorRefSpec { @@ -115,6 +115,23 @@ object ActorRefSpec { class ActorRefSpec extends WordSpec with MustMatchers { import akka.actor.ActorRefSpec._ + def promiseIntercept(f: ⇒ Actor)(to: Promise[Actor]): Actor = try { + val r = f + to.completeWithResult(r) + r + } catch { + case e ⇒ + to.completeWithException(e) + throw e + } + + def wrap[T](f: Promise[Actor] ⇒ T): T = { + val result = new DefaultPromise[Actor](10 * 60 * 1000) + val r = f(result) + result.get + r + } + "An ActorRef" must { "not allow Actors to be created outside of an actorOf" in { @@ -123,10 +140,11 @@ class ActorRefSpec extends WordSpec with MustMatchers { } intercept[akka.actor.ActorInitializationException] { - actorOf(new Actor { - val nested = new Actor { def receive = { case _ ⇒ } } - def receive = { case _ ⇒ } - }) + wrap(result ⇒ + actorOf(new Actor { + val nested = promiseIntercept(new Actor { def receive = { case _ ⇒ } })(result) + def receive = { case _ ⇒ } + })) } def contextStackMustBeEmpty = ActorCell.contextStack.get.headOption must be === None @@ -134,69 +152,80 @@ class ActorRefSpec extends WordSpec with MustMatchers { contextStackMustBeEmpty intercept[akka.actor.ActorInitializationException] { - actorOf(new FailingOuterActor(actorOf(new InnerActor))) + wrap(result ⇒ + actorOf(promiseIntercept(new FailingOuterActor(actorOf(new InnerActor)))(result))) } contextStackMustBeEmpty intercept[akka.actor.ActorInitializationException] { - actorOf(new OuterActor(actorOf(new FailingInnerActor))) + wrap(result ⇒ + actorOf(new OuterActor(actorOf(promiseIntercept(new FailingInnerActor)(result))))) } contextStackMustBeEmpty intercept[akka.actor.ActorInitializationException] { - actorOf(new FailingInheritingOuterActor(actorOf(new InnerActor))) + wrap(result ⇒ + actorOf(promiseIntercept(new FailingInheritingOuterActor(actorOf(new InnerActor)))(result))) } contextStackMustBeEmpty intercept[akka.actor.ActorInitializationException] { - actorOf(new FailingOuterActor(actorOf(new FailingInheritingInnerActor))) + wrap(result ⇒ + actorOf(new FailingOuterActor(actorOf(promiseIntercept(new FailingInheritingInnerActor)(result))))) } contextStackMustBeEmpty intercept[akka.actor.ActorInitializationException] { - actorOf(new FailingInheritingOuterActor(actorOf(new FailingInheritingInnerActor))) + wrap(result ⇒ + actorOf(new FailingInheritingOuterActor(actorOf(promiseIntercept(new FailingInheritingInnerActor)(result))))) } contextStackMustBeEmpty intercept[akka.actor.ActorInitializationException] { - actorOf(new FailingInheritingOuterActor(actorOf(new FailingInnerActor))) + wrap(result ⇒ + actorOf(new FailingInheritingOuterActor(actorOf(promiseIntercept(new FailingInnerActor)(result))))) } contextStackMustBeEmpty intercept[akka.actor.ActorInitializationException] { - actorOf(new OuterActor(actorOf(new InnerActor { - val a = new InnerActor - }))) + wrap(result ⇒ + actorOf(new OuterActor(actorOf(new InnerActor { + val a = promiseIntercept(new InnerActor)(result) + })))) } contextStackMustBeEmpty intercept[akka.actor.ActorInitializationException] { - actorOf(new FailingOuterActor(actorOf(new FailingInheritingInnerActor))) + wrap(result ⇒ + actorOf(new FailingOuterActor(actorOf(promiseIntercept(new FailingInheritingInnerActor)(result))))) } contextStackMustBeEmpty intercept[akka.actor.ActorInitializationException] { - actorOf(new OuterActor(actorOf(new FailingInheritingInnerActor))) + wrap(result ⇒ + actorOf(new OuterActor(actorOf(promiseIntercept(new FailingInheritingInnerActor)(result))))) } contextStackMustBeEmpty intercept[akka.actor.ActorInitializationException] { - actorOf(new OuterActor(actorOf({ new InnerActor; new InnerActor }))) + wrap(result ⇒ + actorOf(new OuterActor(actorOf(promiseIntercept({ new InnerActor; new InnerActor })(result))))) } contextStackMustBeEmpty (intercept[java.lang.IllegalStateException] { - actorOf(new OuterActor(actorOf({ throw new IllegalStateException("Ur state be b0rked"); new InnerActor }))) + wrap(result ⇒ + actorOf(new OuterActor(actorOf(promiseIntercept({ throw new IllegalStateException("Ur state be b0rked"); new InnerActor })(result))))) }).getMessage must be === "Ur state be b0rked" contextStackMustBeEmpty diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 235972ec61..634c9126d8 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -56,9 +56,8 @@ case class HotSwap(code: ActorRef ⇒ Actor.Receive, discardOld: Boolean = true) def this(code: akka.japi.Function[ActorRef, Procedure[Any]]) = this(code, true) } -case object Init extends AutoReceivedMessage with LifeCycleMessage case class Death(deceased: ActorRef, cause: Throwable, recoverable: Boolean) extends AutoReceivedMessage with LifeCycleMessage -case class Restart(reason: Throwable) extends AutoReceivedMessage with LifeCycleMessage +case class Crash(reason: Throwable) extends AutoReceivedMessage case object RevertHotSwap extends AutoReceivedMessage @@ -657,14 +656,13 @@ trait Actor { */ msg match { - case Init ⇒ reply(()); false //All gud nao FIXME remove reply when we can have fully async init case HotSwap(code, discardOld) ⇒ become(code(self), discardOld); false case RevertHotSwap ⇒ unbecome(); false case d: Death ⇒ context.handleDeath(d); false case Link(child) ⇒ self.link(child); false case Unlink(child) ⇒ self.unlink(child); false case UnlinkAndStop(child) ⇒ self.unlink(child); child.stop(); false - case Restart(reason) ⇒ throw reason + case Crash(reason) ⇒ throw reason case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ val ch = channel diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 2d17abf2d5..4555e1cdad 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -49,12 +49,6 @@ private[akka] trait ActorContext { } private[akka] object ActorCell { - sealed trait Status - object Status { - object Running extends Status - object Shutdown extends Status - } - val contextStack = new ThreadLocal[Stack[ActorContext]] { override def initialValue = Stack[ActorContext]() } @@ -72,7 +66,7 @@ private[akka] class ActorCell( val guard = new ReentrantGuard // TODO: remove this last synchronization point @volatile - var status: Status = Status.Running + var terminated = false @volatile var mailbox: AnyRef = _ @@ -111,14 +105,13 @@ private[akka] class ActorCell( def dispatcher: MessageDispatcher = props.dispatcher - def isRunning: Boolean = status == Status.Running - def isShutdown: Boolean = status == Status.Shutdown + def isRunning: Boolean = !terminated + def isShutdown: Boolean = terminated def start(): Unit = { - if (isShutdown) throw new ActorStartException("Can't start an actor that has been stopped") if (props.supervisor.isDefined) props.supervisor.get.link(self) dispatcher.attach(this) - Actor.registry.register(self) + dispatcher.systemDispatch(SystemMessageInvocation(this, Create, NullChannel)) } def newActor(restart: Boolean): Actor = { @@ -153,33 +146,8 @@ private[akka] class ActorCell( def resume(): Unit = dispatcher.resume(this) - private[akka] def stop(): Unit = guard.withGuard { - if (isRunning) { - receiveTimeout = None - cancelReceiveTimeout - Actor.registry.unregister(self) - status = Status.Shutdown - dispatcher.detach(this) - try { - val a = actor.get - if (Actor.debugLifecycle) EventHandler.debug(a, "stopping") - if (a ne null) a.postStop() - - { //Stop supervised actors - val i = _linkedActors.values.iterator - while (i.hasNext) { - i.next.stop() - i.remove() - } - } - - } finally { - //if (supervisor.isDefined) supervisor.get ! Death(self, new ActorKilledException("Stopped"), false) - currentMessage = null - clearActorContext() - } - } - } + private[akka] def stop(): Unit = + if (!terminated) dispatcher.systemDispatch(SystemMessageInvocation(this, Terminate, NullChannel)) def link(actorRef: ActorRef): ActorRef = { guard.withGuard { @@ -252,7 +220,79 @@ private[akka] class ActorCell( case msg ⇒ msg.channel } + def systemInvoke(envelope: SystemMessageInvocation): Unit = { + var isTerminated = terminated + + def create(recreation: Boolean): Unit = try { + actor.get() match { + case null ⇒ + val created = newActor(restart = false) + actor.set(created) + created.preStart() + Actor.registry.register(self) + case instance if recreation ⇒ + restart(new Exception("Restart commanded"), None, None) + case _ ⇒ + } + } catch { + case e ⇒ + e.printStackTrace() + envelope.channel.sendException(e) + if (supervisor.isDefined) supervisor.get ! Death(self, e, false) else throw e + } + + def suspend(): Unit = dispatcher suspend this + + def resume(): Unit = dispatcher resume this + + def terminate(): Unit = { + receiveTimeout = None + cancelReceiveTimeout + Actor.registry.unregister(self) + isTerminated = true + dispatcher.detach(this) + try { + val a = actor.get + if (Actor.debugLifecycle) EventHandler.debug(a, "stopping") + if (a ne null) a.postStop() + + { //Stop supervised actors + val i = _linkedActors.values.iterator + while (i.hasNext) { + i.next.stop() + i.remove() + } + } + + } finally { + if (supervisor.isDefined) supervisor.get ! Death(self, new ActorKilledException("Stopped"), false) + currentMessage = null + clearActorContext() + } + } + guard.lock.lock() + try { + if (!isTerminated) { + envelope.message match { + case Create ⇒ create(recreation = false) + case Recreate ⇒ create(recreation = true) + case Suspend ⇒ suspend() + case Resume ⇒ resume() + case Terminate ⇒ terminate() + } + } + } catch { + case e ⇒ //Should we really catch everything here? + EventHandler.error(e, actor.get(), e.getMessage) + throw e + } finally { + terminated = isTerminated + guard.lock.unlock() + } + } + def invoke(messageHandle: MessageInvocation): Unit = { + var isTerminated = terminated guard.lock.lock() try { if (!isShutdown) { @@ -261,17 +301,7 @@ private[akka] class ActorCell( try { cancelReceiveTimeout() // FIXME: leave this here? - val a = actor.get() match { - case null ⇒ - val created = newActor(restart = false) - actor.set(created) - if (Actor.debugLifecycle) EventHandler.debug(created, "started") - created.preStart() - created - case instance ⇒ instance - } - - a.apply(messageHandle.message) + actor.get().apply(messageHandle.message) currentMessage = null // reset current message after successful invocation } catch { case e ⇒ @@ -294,9 +324,11 @@ private[akka] class ActorCell( throw e } } else { + messageHandle.channel sendException new ActorKilledException("Actor has been stopped") // throwing away message if actor is shut down, no use throwing an exception in receiving actor's thread, isShutdown is enforced on caller side } } finally { + terminated = isTerminated guard.lock.unlock() } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 21542f064f..b232ca6270 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -92,8 +92,18 @@ class Dispatcher( protected[akka] def dispatch(invocation: MessageInvocation) = { val mbox = getMailbox(invocation.receiver) - mbox enqueue invocation - registerForExecution(mbox) + if (mbox ne null) { + mbox enqueue invocation + registerForExecution(mbox) + } + } + + protected[akka] def systemDispatch(invocation: SystemMessageInvocation) = { + val mbox = getMailbox(invocation.receiver) + if (mbox ne null) { + mbox systemEnqueue invocation + registerForExecution(mbox) + } } protected[akka] def executeTask(invocation: TaskInvocation): Unit = if (active.isOn) { @@ -142,7 +152,7 @@ class Dispatcher( protected[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = { if (mbox.dispatcherLock.tryLock()) { - if (active.isOn && !mbox.suspended.locked) { //If the dispatcher is active and the actor not suspended + if (active.isOn && (!mbox.suspended.locked || !mbox.systemMessages.isEmpty)) { //If the dispatcher is active and the actor not suspended try { executorService.get() execute mbox } catch { @@ -196,7 +206,7 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue ⇒ case ie: InterruptedException ⇒ Thread.currentThread().interrupt() //Restore interrupt } finally { dispatcherLock.unlock() - if (!self.isEmpty) + if (!self.isEmpty || !self.systemMessages.isEmpty) dispatcher.reRegisterForExecution(this) } } @@ -207,6 +217,7 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue ⇒ * @return true if the processing finished before the mailbox was empty, due to the throughput constraint */ final def processMailbox() { + processAllSystemMessages() if (!self.suspended.locked) { var nextMessage = self.dequeue if (nextMessage ne null) { //If we have a message @@ -219,6 +230,7 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue ⇒ else 0 do { nextMessage.invoke + processAllSystemMessages() nextMessage = if (self.suspended.locked) { null // If we are suspended, abort diff --git a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala index 03be4d08dc..ece5c3d1a9 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala @@ -16,12 +16,24 @@ class MessageQueueAppendFailedException(message: String, cause: Throwable = null * @author Jonas Bonér */ trait MessageQueue { - val dispatcherLock = new SimpleLock - val suspended = new SimpleLock + val dispatcherLock = new SimpleLock(startLocked = false) + val suspended = new SimpleLock(startLocked = false) //(startLocked = true) + val systemMessages = new ConcurrentLinkedQueue[SystemMessageInvocation]() + def enqueue(handle: MessageInvocation) def dequeue(): MessageInvocation + def systemEnqueue(handle: SystemMessageInvocation): Unit = systemMessages.offer(handle) + def systemDequeue(): SystemMessageInvocation = systemMessages.poll() def size: Int def isEmpty: Boolean + + def processAllSystemMessages(): Unit = { + var nextMessage = systemDequeue() + while (nextMessage ne null) { + nextMessage.invoke() + nextMessage = systemDequeue() + } + } } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index 2156c06a6b..a5dddbda12 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -16,14 +16,22 @@ import akka.actor._ /** * @author Jonas Bonér */ -final case class MessageInvocation(val receiver: ActorCell, - val message: Any, - val channel: UntypedChannel) { +final case class MessageInvocation(val receiver: ActorCell, val message: Any, val channel: UntypedChannel) { if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null") - final def invoke() { - receiver invoke this - } + final def invoke() { receiver invoke this } +} + +sealed trait SystemMessage +case object Create extends SystemMessage +case object Recreate extends SystemMessage +case object Suspend extends SystemMessage +case object Resume extends SystemMessage +case object Terminate extends SystemMessage + +final case class SystemMessageInvocation(val receiver: ActorCell, val message: SystemMessage, val channel: UntypedChannel) { + if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null") + final def invoke() { receiver systemInvoke this } } final case class TaskInvocation(function: () ⇒ Unit, cleanup: () ⇒ Unit) extends Runnable { @@ -73,15 +81,12 @@ abstract class MessageDispatcher extends Serializable { * Attaches the specified actor instance to this dispatcher */ final def attach(actor: ActorCell): Unit = { - val promise = new ActorPromise(Timeout.never)(this) guard.lock.lock() try { register(actor) - dispatchMessage(new MessageInvocation(actor, Init, promise)) } finally { guard.lock.unlock() } - promise.get } /** @@ -226,6 +231,11 @@ abstract class MessageDispatcher extends Serializable { */ protected[akka] def dispatch(invocation: MessageInvocation) + /** + * Will be called when the dispatcher is to queue an invocation for execution + */ + protected[akka] def systemDispatch(invocation: SystemMessageInvocation) + protected[akka] def executeTask(invocation: TaskInvocation) /** diff --git a/akka-actor/src/main/scala/akka/util/LockUtil.scala b/akka-actor/src/main/scala/akka/util/LockUtil.scala index 27cb8807ae..ecef3de2f5 100644 --- a/akka-actor/src/main/scala/akka/util/LockUtil.scala +++ b/akka-actor/src/main/scala/akka/util/LockUtil.scala @@ -55,9 +55,7 @@ class ReadWriteGuard { * A very simple lock that uses CCAS (Compare Compare-And-Swap) * Does not keep track of the owner and isn't Reentrant, so don't nest and try to stick to the if*-methods */ -class SimpleLock { - val acquired = new AtomicBoolean(false) - +class SimpleLock(startLocked: Boolean = false) extends AtomicBoolean(startLocked) { def ifPossible(perform: () ⇒ Unit): Boolean = { if (tryLock()) { try { @@ -89,20 +87,13 @@ class SimpleLock { } else None } - def tryLock() = { - if (acquired.get) false - else acquired.compareAndSet(false, true) - } + def tryLock() = compareAndSet(false, true) - def tryUnlock() = { - acquired.compareAndSet(true, false) - } + def tryUnlock() = compareAndSet(true, false) - def locked = acquired.get + def locked = get - def unlock() { - acquired.set(false) - } + def unlock(): Unit = set(false) } /** diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 340d44c364..df632c31a3 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -4,7 +4,6 @@ package akka.testkit import akka.event.EventHandler -import akka.dispatch.{ MessageDispatcher, MessageInvocation, TaskInvocation, Promise, ActorPromise } import java.util.concurrent.locks.ReentrantLock import java.util.LinkedList import java.util.concurrent.RejectedExecutionException @@ -12,6 +11,7 @@ import akka.util.Switch import java.lang.ref.WeakReference import scala.annotation.tailrec import akka.actor.ActorCell +import akka.dispatch._ /* * Locking rules: @@ -137,6 +137,10 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings: override def mailboxIsEmpty(actor: ActorCell): Boolean = getMailbox(actor).queue.isEmpty + protected[akka] override def systemDispatch(handle: SystemMessageInvocation) { + handle.invoke() //Roland, look at me + } + protected[akka] override def dispatch(handle: MessageInvocation) { val mbox = getMailbox(handle.receiver) val queue = mbox.queue