Merge remote-tracking branch 'origin/master' into wip-remote-supervision-rk

This commit is contained in:
Roland 2011-12-13 16:59:43 +01:00
commit 92e7693203
247 changed files with 7896 additions and 4703 deletions

View file

@ -17,9 +17,6 @@ import akka.event.EventStream
import akka.actor.ActorSystem.Settings
import com.typesafe.config.Config
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
final case class Envelope(val message: Any, val sender: ActorRef) {
if (message.isInstanceOf[AnyRef] && (message.asInstanceOf[AnyRef] eq null)) throw new InvalidMessageException("Message is null")
}
@ -87,9 +84,6 @@ object MessageDispatcher {
implicit def defaultDispatcher(implicit system: ActorSystem) = system.dispatcher
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with Serializable {
import MessageDispatcher._
@ -138,7 +132,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
shutdownScheduleUpdater.get(this) match {
case UNSCHEDULED
if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) {
scheduler.scheduleOnce(shutdownTimeout, shutdownAction)
scheduleShutdownAction()
()
} else ifSensibleToDoSoThenScheduleShutdown()
case SCHEDULED
@ -149,6 +143,13 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
case _ ()
}
private def scheduleShutdownAction(): Unit = {
// IllegalStateException is thrown if scheduler has been shutdown
try scheduler.scheduleOnce(shutdownTimeout, shutdownAction) catch {
case _: IllegalStateException shutdown()
}
}
private final val taskCleanup: () Unit =
() if (inhabitantsUpdater.decrementAndGet(this) == 0) ifSensibleToDoSoThenScheduleShutdown()
@ -169,36 +170,9 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
val mailBox = actor.mailbox
mailBox.becomeClosed() // FIXME reschedule in tell if possible race with cleanUp is detected in order to properly clean up
actor.mailbox = deadLetterMailbox
cleanUpMailboxFor(actor, mailBox)
mailBox.cleanUp()
}
/**
* Overridable callback to clean up the mailbox for a given actor,
* called when an actor is unregistered.
*/
protected def cleanUpMailboxFor(actor: ActorCell, mailBox: Mailbox) {
if (mailBox.hasSystemMessages) {
var message = mailBox.systemDrain()
while (message ne null) {
// message must be virgin before being able to systemEnqueue again
val next = message.next
message.next = null
deadLetterMailbox.systemEnqueue(actor.self, message)
message = next
}
}
if (mailBox.hasMessages) {
var envelope = mailBox.dequeue
while (envelope ne null) {
deadLetterMailbox.enqueue(actor.self, envelope)
envelope = mailBox.dequeue
}
}
}
private val shutdownAction = new Runnable {
@tailrec
final def run() {
@ -213,9 +187,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
}
case RESCHEDULED
if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED))
try scheduler.scheduleOnce(shutdownTimeout, this) catch {
case _: IllegalStateException shutdown()
}
scheduleShutdownAction()
else run()
}
}
@ -224,7 +196,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
/**
* When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down,
* defaulting to your akka configs "akka.actor.dispatcher-shutdown-timeout" or default specified in
* akka-actor-reference.conf
* reference.conf
*/
protected[akka] def shutdownTimeout: Duration
@ -290,7 +262,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
}
/**
* Trait to be used for hooking in new dispatchers into Dispatchers.fromConfig
* Trait to be used for hooking in new dispatchers into Dispatchers.from(cfg: Config)
*/
abstract class MessageDispatcherConfigurator() {
/**
@ -307,9 +279,10 @@ abstract class MessageDispatcherConfigurator() {
}
}
def configureThreadPool(config: Config,
settings: Settings,
createDispatcher: (ThreadPoolConfig) MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
def configureThreadPool(
config: Config,
settings: Settings,
createDispatcher: (ThreadPoolConfig) MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
import ThreadPoolConfigDispatcherBuilder.conf_?
//Apply the following options to the config if they are present in the config