Removing suspend and resume from user-facing API

This commit is contained in:
Viktor Klang 2011-12-02 01:27:42 +01:00
parent 879ea7c2b4
commit d626cc2455
5 changed files with 31 additions and 43 deletions

View file

@ -46,7 +46,7 @@ import akka.event.DeathWatch
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable { abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable {
scalaRef: ScalaActorRef scalaRef: ScalaActorRef with RefInternals
// Only mutable for RemoteServer in order to maintain identity across nodes // Only mutable for RemoteServer in order to maintain identity across nodes
/** /**
@ -108,16 +108,6 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
*/ */
def forward(message: Any)(implicit context: ActorContext) = tell(message, context.sender) def forward(message: Any)(implicit context: ActorContext) = tell(message, context.sender)
/**
* Suspends the actor. It will not process messages while suspended.
*/
def suspend(): Unit //TODO FIXME REMOVE THIS
/**
* Resumes a suspended actor.
*/
def resume(): Unit //TODO FIXME REMOVE THIS
/** /**
* Shuts down the actor its dispatcher and message queue. * Shuts down the actor its dispatcher and message queue.
*/ */
@ -151,7 +141,7 @@ class LocalActorRef private[akka] (
val systemService: Boolean = false, val systemService: Boolean = false,
_receiveTimeout: Option[Long] = None, _receiveTimeout: Option[Long] = None,
_hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap) _hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap)
extends ActorRef with ScalaActorRef { extends ActorRef with ScalaActorRef with RefInternals {
def name = path.name def name = path.name
@ -260,7 +250,11 @@ trait ScalaActorRef { ref: ActorRef ⇒
* implicit timeout * implicit timeout
*/ */
def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout) def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout)
}
private[akka] trait RefInternals {
def resume(): Unit
def suspend(): Unit
protected[akka] def restart(cause: Throwable): Unit protected[akka] def restart(cause: Throwable): Unit
} }
@ -289,7 +283,7 @@ case class SerializedActorRef(hostname: String, port: Int, path: String) {
/** /**
* Trait for ActorRef implementations where all methods contain default stubs. * Trait for ActorRef implementations where all methods contain default stubs.
*/ */
trait MinimalActorRef extends ActorRef with ScalaActorRef { trait MinimalActorRef extends ActorRef with ScalaActorRef with RefInternals {
private[akka] val uuid: Uuid = newUuid() private[akka] val uuid: Uuid = newUuid()
def name: String = uuid.toString def name: String = uuid.toString

View file

@ -122,12 +122,12 @@ abstract class FaultHandlingStrategy {
def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit = { def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit = {
if (children.nonEmpty) if (children.nonEmpty)
children.foreach(_.suspend()) children.foreach(_.asInstanceOf[RefInternals].suspend())
} }
def handleSupervisorRestarted(cause: Throwable, supervisor: ActorRef, children: Iterable[ActorRef]): Unit = { def handleSupervisorRestarted(cause: Throwable, supervisor: ActorRef, children: Iterable[ActorRef]): Unit = {
if (children.nonEmpty) if (children.nonEmpty)
children.foreach(_.restart(cause)) children.foreach(_.asInstanceOf[RefInternals].restart(cause))
} }
/** /**
@ -136,7 +136,7 @@ abstract class FaultHandlingStrategy {
def handleFailure(child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { def handleFailure(child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = {
val action = if (decider.isDefinedAt(cause)) decider(cause) else Escalate val action = if (decider.isDefinedAt(cause)) decider(cause) else Escalate
action match { action match {
case Resume child.resume(); true case Resume child.asInstanceOf[RefInternals].resume(); true
case Restart processFailure(true, child, cause, stats, children); true case Restart processFailure(true, child, cause, stats, children); true
case Stop processFailure(false, child, cause, stats, children); true case Stop processFailure(false, child, cause, stats, children); true
case Escalate false case Escalate false
@ -194,7 +194,7 @@ case class AllForOneStrategy(decider: FaultHandlingStrategy.Decider,
def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = {
if (children.nonEmpty) { if (children.nonEmpty) {
if (restart && children.forall(_.requestRestartPermission(retriesWindow))) if (restart && children.forall(_.requestRestartPermission(retriesWindow)))
children.foreach(_.child.restart(cause)) children.foreach(_.child.asInstanceOf[RefInternals].restart(cause))
else else
children.foreach(_.child.stop()) children.foreach(_.child.stop())
} }
@ -247,7 +247,7 @@ case class OneForOneStrategy(decider: FaultHandlingStrategy.Decider,
def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = {
if (restart && stats.requestRestartPermission(retriesWindow)) if (restart && stats.requestRestartPermission(retriesWindow))
child.restart(cause) child.asInstanceOf[RefInternals].restart(cause)
else else
child.stop() //TODO optimization to drop child here already? child.stop() //TODO optimization to drop child here already?
} }

View file

@ -155,6 +155,15 @@ Creating a Dispatcher with a priority mailbox using PriorityGenerator:
public class Main { public class Main {
// A simple Actor that just prints the messages it processes // A simple Actor that just prints the messages it processes
public static class MyActor extends UntypedActor { public static class MyActor extends UntypedActor {
public MyActor() {
self.tell("lowpriority");
getSelf().tell("lowpriority");
getSelf().tell("highpriority");
getSelf().tell("pigdog");
getSelf().tell("pigdog2");
getSelf().tell("pigdog3");
getSelf().tell("highpriority");
}
public void onReceive(Object message) throws Exception { public void onReceive(Object message) throws Exception {
System.out.println(message); System.out.println(message);
} }
@ -170,19 +179,9 @@ Creating a Dispatcher with a priority mailbox using PriorityGenerator:
} }
}; };
// We create an instance of the actor that will print out the messages it processes // We create an instance of the actor that will print out the messages it processes
ActorRef ref = Actors.actorOf(MyActor.class); // We create a new Priority dispatcher and seed it with the priority generator
// We create a new Priority dispatcher and seed it with the priority generator ActorRef ref = Actors.actorOf((new Props()).withCreator(MyActor.class).withDispatcher(new Dispatcher("foo", 5, new UnboundedPriorityMailbox(gen))));
ref.setDispatcher(new Dispatcher("foo", 5, new UnboundedPriorityMailbox(gen)));
ref.getDispatcher().suspend(ref); // Suspending the actor so it doesn't start to treat the messages before we have enqueued all of them :-)
ref.tell("lowpriority");
ref.tell("lowpriority");
ref.tell("highpriority");
ref.tell("pigdog");
ref.tell("pigdog2");
ref.tell("pigdog3");
ref.tell("highpriority");
ref.getDispatcher().resume(ref); // Resuming the actor so it will start treating its messages
} }
} }

View file

@ -155,23 +155,18 @@ Creating a Dispatcher using PriorityGenerator:
val a = Actor.actorOf( // We create a new Actor that just prints out what it processes val a = Actor.actorOf( // We create a new Actor that just prints out what it processes
Props(new Actor { Props(new Actor {
self ! 'lowpriority
self ! 'lowpriority
self ! 'highpriority
self ! 'pigdog
self ! 'pigdog2
self ! 'pigdog3
self ! 'highpriority
def receive = { def receive = {
case x => println(x) case x => println(x)
} }
}).withDispatcher(new Dispatcher("foo", 5, UnboundedPriorityMailbox(gen)))) // We create a new Priority dispatcher and seed it with the priority generator }).withDispatcher(new Dispatcher("foo", 5, UnboundedPriorityMailbox(gen)))) // We create a new Priority dispatcher and seed it with the priority generator
a.dispatcher.suspend(a) // Suspending the actor so it doesn't start to treat the messages before we have enqueued all of them :-)
a ! 'lowpriority
a ! 'lowpriority
a ! 'highpriority
a ! 'pigdog
a ! 'pigdog2
a ! 'pigdog3
a ! 'highpriority
a.dispatcher.resume(a) // Resuming the actor so it will start treating its messages
Prints: Prints:
'highpriority 'highpriority

View file

@ -285,7 +285,7 @@ private[akka] case class RemoteActorRef private[akka] (
remoteAddress: RemoteAddress, remoteAddress: RemoteAddress,
path: ActorPath, path: ActorPath,
loader: Option[ClassLoader]) loader: Option[ClassLoader])
extends ActorRef with ScalaActorRef { extends ActorRef with ScalaActorRef with RefInternals {
@volatile @volatile
private var running: Boolean = true private var running: Boolean = true