Merge branch 'master' into derekjw-future

This commit is contained in:
Derek Williams 2011-02-22 18:58:32 -07:00
commit 71a4e0cb8a
5 changed files with 95 additions and 97 deletions

View file

@ -23,7 +23,10 @@ import akka.japi. {Creator, Procedure}
*/
@serializable sealed trait LifeCycleMessage
case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true) extends LifeCycleMessage {
/* Marker trait to show which Messages are automatically handled by Akka */
sealed trait AutoReceivedMessage { self: LifeCycleMessage => }
case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true) extends AutoReceivedMessage with LifeCycleMessage {
/**
* Java API
*/
@ -40,22 +43,22 @@ case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true)
def this(code: akka.japi.Function[ActorRef,Procedure[Any]]) = this(code, true)
}
case object RevertHotSwap extends LifeCycleMessage
case object RevertHotSwap extends AutoReceivedMessage with LifeCycleMessage
case class Restart(reason: Throwable) extends LifeCycleMessage
case class Restart(reason: Throwable) extends AutoReceivedMessage with LifeCycleMessage
case class Exit(dead: ActorRef, killer: Throwable) extends LifeCycleMessage
case class Exit(dead: ActorRef, killer: Throwable) extends AutoReceivedMessage with LifeCycleMessage
case class Link(child: ActorRef) extends LifeCycleMessage
case class Link(child: ActorRef) extends AutoReceivedMessage with LifeCycleMessage
case class Unlink(child: ActorRef) extends LifeCycleMessage
case class Unlink(child: ActorRef) extends AutoReceivedMessage with LifeCycleMessage
case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage
case class UnlinkAndStop(child: ActorRef) extends AutoReceivedMessage with LifeCycleMessage
case object PoisonPill extends AutoReceivedMessage with LifeCycleMessage
case object ReceiveTimeout extends LifeCycleMessage
case object PoisonPill extends LifeCycleMessage
case class MaximumNumberOfRestartsWithinTimeRangeReached(
@BeanProperty val victim: ActorRef,
@BeanProperty val maxNrOfRetries: Option[Int],
@ -303,8 +306,7 @@ trait Actor extends Logging {
"\n\t\t'val actor = Actor.actorOf[MyActor]', or" +
"\n\t\t'val actor = Actor.actorOf(new MyActor(..))', or" +
"\n\t\t'val actor = Actor.actor { case msg => .. } }'")
val ref = optRef.asInstanceOf[Some[ActorRef]].get
ref.id = getClass.getName //FIXME: Is this needed?
optRef.asInstanceOf[Some[ActorRef]].get.id = getClass.getName //FIXME: Is this needed?
optRef.asInstanceOf[Some[ActorRef]]
}
@ -426,56 +428,57 @@ trait Actor extends Logging {
/**
* Reverts the Actor behavior to the previous one in the hotswap stack.
*/
def unbecome: Unit = if (!self.hotswap.isEmpty) self.hotswap = self.hotswap.pop
def unbecome: Unit = {
val h = self.hotswap
if (h.nonEmpty)
self.hotswap = h.pop
}
// =========================================
// ==== INTERNAL IMPLEMENTATION DETAILS ====
// =========================================
private[akka] def apply(msg: Any) = fullBehavior(msg)
private[akka] final def apply(msg: Any) = fullBehavior(msg) //TODO: Scala 2.9.0 => processingBehavior.applyOrElse(msg, unhandledMsgFun)
private final def autoReceiveMessage(msg: AutoReceivedMessage) {
msg match {
case HotSwap(code,discardOld) => become(code(self),discardOld)
case RevertHotSwap => unbecome
case Exit(dead, reason) => self.handleTrapExit(dead, reason)
case Link(child) => self.link(child)
case Unlink(child) => self.unlink(child)
case UnlinkAndStop(child) => self.unlink(child); child.stop
case Restart(reason) => throw reason
case PoisonPill => {
val f = self.senderFuture
if(f.isDefined) {
f.get.completeWithException(new ActorKilledException("PoisonPill"))
}
self.stop
}
}
}
/*Processingbehavior and fullBehavior are duplicates so make sure changes are done to both */
private lazy val processingBehavior: Receive = {
lazy val defaultBehavior = receive
val defaultBehavior = receive
val actorBehavior: Receive = {
case HotSwap(code,discardOld) => become(code(self),discardOld)
case RevertHotSwap => unbecome
case Exit(dead, reason) => self.handleTrapExit(dead, reason)
case Link(child) => self.link(child)
case Unlink(child) => self.unlink(child)
case UnlinkAndStop(child) => self.unlink(child); child.stop
case Restart(reason) => throw reason
case PoisonPill => if(self.senderFuture.isDefined) {
self.senderFuture.get.completeWithException(
new ActorKilledException("PoisonPill")
)
}
self.stop
case msg if !self.hotswap.isEmpty &&
case l: AutoReceivedMessage => autoReceiveMessage(l)
case msg if self.hotswap.nonEmpty &&
self.hotswap.head.isDefinedAt(msg) => self.hotswap.head.apply(msg)
case msg if self.hotswap.isEmpty &&
defaultBehavior.isDefinedAt(msg) => defaultBehavior.apply(msg)
}
actorBehavior
}
//TODO: Scala2.9.0 replace with: val unhandledMsgFun: Any => Unit = unhandled _
private lazy val fullBehavior: Receive = {
lazy val defaultBehavior = receive
val defaultBehavior = receive
val actorBehavior: Receive = {
case HotSwap(code, discardOld) => become(code(self), discardOld)
case RevertHotSwap => unbecome
case Exit(dead, reason) => self.handleTrapExit(dead, reason)
case Link(child) => self.link(child)
case Unlink(child) => self.unlink(child)
case UnlinkAndStop(child) => self.unlink(child); child.stop
case Restart(reason) => throw reason
case PoisonPill => if(self.senderFuture.isDefined) {
self.senderFuture.get.completeWithException(
new ActorKilledException("PoisonPill")
)
}
self.stop
case msg if !self.hotswap.isEmpty &&
case l: AutoReceivedMessage => autoReceiveMessage(l)
case msg if self.hotswap.nonEmpty &&
self.hotswap.head.isDefinedAt(msg) => self.hotswap.head.apply(msg)
case msg if self.hotswap.isEmpty &&
defaultBehavior.isDefinedAt(msg) => defaultBehavior.apply(msg)

View file

@ -829,7 +829,16 @@ class LocalActorRef private[akka] (
else {
currentMessage = messageHandle
try {
dispatch(messageHandle)
Actor.log.slf4j.trace("Invoking actor with message: {}\n", messageHandle)
try {
cancelReceiveTimeout // FIXME: leave this here?
actor(messageHandle.message)
} catch {
case e: InterruptedException => {} // received message while actor is shutting down, ignore
case e => handleExceptionInDispatch(e, messageHandle.message)
} finally {
checkReceiveTimeout // Reschedule receive timeout
}
} catch {
case e =>
Actor.log.slf4j.error("Could not invoke actor [{}]", this)
@ -1003,22 +1012,6 @@ class LocalActorRef private[akka] (
a
}
private def dispatch[T](messageHandle: MessageInvocation) = {
Actor.log.slf4j.trace("Invoking actor with message: {}\n", messageHandle)
val message = messageHandle.message //serializeMessage(messageHandle.message)
try {
cancelReceiveTimeout // FIXME: leave this here?
actor(message)
} catch {
case e: InterruptedException => {} // received message while actor is shutting down, ignore
case e => handleExceptionInDispatch(e, message)
}
finally {
checkReceiveTimeout // Reschedule receive timeout
}
}
private def shutDownTemporaryActor(temporaryActor: ActorRef) {
Actor.log.slf4j.info("Actor [{}] configured as TEMPORARY and will not be restarted.", temporaryActor.id)
temporaryActor.stop

View file

@ -9,7 +9,7 @@ import akka.util.{ReflectiveAccess, Switch}
import java.util.Queue
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue}
import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue}
/**
* Default settings are:
@ -128,7 +128,7 @@ class ExecutorBasedEventDrivenDispatcher(
private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) {
if (mbox.suspended.isOff && mbox.dispatcherLock.tryLock()) {
if (!mbox.suspended.locked && mbox.dispatcherLock.tryLock()) {
try {
executorService.get() execute mbox
} catch {
@ -143,13 +143,13 @@ class ExecutorBasedEventDrivenDispatcher(
def suspend(actorRef: ActorRef) {
log.slf4j.debug("Suspending {}",actorRef.uuid)
getMailbox(actorRef).suspended.switchOn
getMailbox(actorRef).suspended.tryLock
}
def resume(actorRef: ActorRef) {
log.slf4j.debug("Resuming {}",actorRef.uuid)
val mbox = getMailbox(actorRef)
mbox.suspended.switchOff
mbox.suspended.tryUnlock
registerForExecution(mbox)
}
}
@ -162,12 +162,14 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue =>
def dispatcher: ExecutorBasedEventDrivenDispatcher
final def run = {
val reschedule = try {
try { processMailbox() } catch { case ie: InterruptedException => true }
try {
processMailbox()
} catch {
case ie: InterruptedException =>
} finally {
dispatcherLock.unlock()
}
if (reschedule || !self.isEmpty)
if (!self.isEmpty)
dispatcher.registerForExecution(this)
}
@ -176,33 +178,33 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue =>
*
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
*/
final def processMailbox(): Boolean = {
if (self.suspended.isOn)
true
else {
final def processMailbox() {
if (!self.suspended.locked) {
var nextMessage = self.dequeue
if (nextMessage ne null) {
val throttle = dispatcher.throughput > 0
var processedMessages = 0
val isDeadlineEnabled = throttle && dispatcher.throughputDeadlineTime > 0
val started = if (isDeadlineEnabled) System.currentTimeMillis else 0
do {
nextMessage.invoke
if (nextMessage ne null) { //If we have a message
if (dispatcher.throughput <= 1) //If we only run one message per process
nextMessage.invoke //Just run it
else { //But otherwise, if we are throttled, we need to do some book-keeping
var processedMessages = 0
val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0
val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) else 0
do {
nextMessage.invoke
if (throttle) { // Will be elided when false
processedMessages += 1
if ((processedMessages >= dispatcher.throughput) ||
(isDeadlineEnabled && (System.currentTimeMillis - started) >= dispatcher.throughputDeadlineTime)) // If we're throttled, break out
return !self.isEmpty
}
if (self.suspended.isOn)
return true
nextMessage = self.dequeue
} while (nextMessage ne null)
nextMessage =
if (self.suspended.locked) {
null //If we are suspended, abort
}
else { //If we aren't suspended, we need to make sure we're not overstepping our boundaries
processedMessages += 1
if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out
null //We reached our boundaries, abort
else
self.dequeue //Dequeue the next message
}
} while (nextMessage ne null)
}
}
false
}
}
}

View file

@ -95,13 +95,13 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
* @return
*/
private def processMailbox(mailbox: MessageQueue): Boolean = try {
if (mailbox.suspended.isOn)
if (mailbox.suspended.locked)
return false
var messageInvocation = mailbox.dequeue
while (messageInvocation ne null) {
messageInvocation.invoke
if (mailbox.suspended.isOn)
if (mailbox.suspended.locked)
return false
messageInvocation = mailbox.dequeue
}
@ -180,12 +180,12 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
def suspend(actorRef: ActorRef) {
getMailbox(actorRef).suspended.switchOn
getMailbox(actorRef).suspended.tryLock
}
def resume(actorRef: ActorRef) {
val mbox = getMailbox(actorRef)
mbox.suspended.switchOff
mbox.suspended.tryUnlock
executorService.get() execute mbox
}

View file

@ -19,7 +19,7 @@ class MessageQueueAppendFailedException(message: String) extends AkkaException(m
*/
trait MessageQueue {
val dispatcherLock = new SimpleLock
val suspended = new Switch(false)
val suspended = new SimpleLock
def enqueue(handle: MessageInvocation)
def dequeue(): MessageInvocation
def size: Int