diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index c86a2dc66e..30a20ad769 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -46,7 +46,7 @@ import akka.event.DeathWatch * @author Jonas Bonér */ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable { - scalaRef: ScalaActorRef ⇒ + scalaRef: ScalaActorRef with RefInternals ⇒ // Only mutable for RemoteServer in order to maintain identity across nodes /** @@ -108,16 +108,6 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable */ def forward(message: Any)(implicit context: ActorContext) = tell(message, context.sender) - /** - * Suspends the actor. It will not process messages while suspended. - */ - def suspend(): Unit //TODO FIXME REMOVE THIS - - /** - * Resumes a suspended actor. - */ - def resume(): Unit //TODO FIXME REMOVE THIS - /** * Shuts down the actor its dispatcher and message queue. */ @@ -151,7 +141,7 @@ class LocalActorRef private[akka] ( val systemService: Boolean = false, _receiveTimeout: Option[Long] = None, _hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap) - extends ActorRef with ScalaActorRef { + extends ActorRef with ScalaActorRef with RefInternals { def name = path.name @@ -260,7 +250,11 @@ trait ScalaActorRef { ref: ActorRef ⇒ * implicit timeout */ def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout) +} +private[akka] trait RefInternals { + def resume(): Unit + def suspend(): Unit protected[akka] def restart(cause: Throwable): Unit } @@ -289,7 +283,7 @@ case class SerializedActorRef(hostname: String, port: Int, path: String) { /** * Trait for ActorRef implementations where all methods contain default stubs. */ -trait MinimalActorRef extends ActorRef with ScalaActorRef { +trait MinimalActorRef extends ActorRef with ScalaActorRef with RefInternals { private[akka] val uuid: Uuid = newUuid() def name: String = uuid.toString diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 4656f5a3e3..b079550998 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -122,12 +122,12 @@ abstract class FaultHandlingStrategy { def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit = { if (children.nonEmpty) - children.foreach(_.suspend()) + children.foreach(_.asInstanceOf[RefInternals].suspend()) } def handleSupervisorRestarted(cause: Throwable, supervisor: ActorRef, children: Iterable[ActorRef]): Unit = { if (children.nonEmpty) - children.foreach(_.restart(cause)) + children.foreach(_.asInstanceOf[RefInternals].restart(cause)) } /** @@ -136,7 +136,7 @@ abstract class FaultHandlingStrategy { def handleFailure(child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { val action = if (decider.isDefinedAt(cause)) decider(cause) else Escalate action match { - case Resume ⇒ child.resume(); true + case Resume ⇒ child.asInstanceOf[RefInternals].resume(); true case Restart ⇒ processFailure(true, child, cause, stats, children); true case Stop ⇒ processFailure(false, child, cause, stats, children); true case Escalate ⇒ false @@ -194,7 +194,7 @@ case class AllForOneStrategy(decider: FaultHandlingStrategy.Decider, def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { if (children.nonEmpty) { if (restart && children.forall(_.requestRestartPermission(retriesWindow))) - children.foreach(_.child.restart(cause)) + children.foreach(_.child.asInstanceOf[RefInternals].restart(cause)) else children.foreach(_.child.stop()) } @@ -247,7 +247,7 @@ case class OneForOneStrategy(decider: FaultHandlingStrategy.Decider, def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { if (restart && stats.requestRestartPermission(retriesWindow)) - child.restart(cause) + child.asInstanceOf[RefInternals].restart(cause) else child.stop() //TODO optimization to drop child here already? } diff --git a/akka-docs/java/dispatchers.rst b/akka-docs/java/dispatchers.rst index d9f32f38c9..dc2684f9d8 100644 --- a/akka-docs/java/dispatchers.rst +++ b/akka-docs/java/dispatchers.rst @@ -155,6 +155,15 @@ Creating a Dispatcher with a priority mailbox using PriorityGenerator: public class Main { // A simple Actor that just prints the messages it processes public static class MyActor extends UntypedActor { + public MyActor() { + self.tell("lowpriority"); + getSelf().tell("lowpriority"); + getSelf().tell("highpriority"); + getSelf().tell("pigdog"); + getSelf().tell("pigdog2"); + getSelf().tell("pigdog3"); + getSelf().tell("highpriority"); + } public void onReceive(Object message) throws Exception { System.out.println(message); } @@ -170,19 +179,9 @@ Creating a Dispatcher with a priority mailbox using PriorityGenerator: } }; // We create an instance of the actor that will print out the messages it processes - ActorRef ref = Actors.actorOf(MyActor.class); - // We create a new Priority dispatcher and seed it with the priority generator - ref.setDispatcher(new Dispatcher("foo", 5, new UnboundedPriorityMailbox(gen))); + // We create a new Priority dispatcher and seed it with the priority generator + ActorRef ref = Actors.actorOf((new Props()).withCreator(MyActor.class).withDispatcher(new Dispatcher("foo", 5, new UnboundedPriorityMailbox(gen)))); - ref.getDispatcher().suspend(ref); // Suspending the actor so it doesn't start to treat the messages before we have enqueued all of them :-) - ref.tell("lowpriority"); - ref.tell("lowpriority"); - ref.tell("highpriority"); - ref.tell("pigdog"); - ref.tell("pigdog2"); - ref.tell("pigdog3"); - ref.tell("highpriority"); - ref.getDispatcher().resume(ref); // Resuming the actor so it will start treating its messages } } diff --git a/akka-docs/scala/dispatchers.rst b/akka-docs/scala/dispatchers.rst index fa00c746f5..e16c336753 100644 --- a/akka-docs/scala/dispatchers.rst +++ b/akka-docs/scala/dispatchers.rst @@ -155,23 +155,18 @@ Creating a Dispatcher using PriorityGenerator: val a = Actor.actorOf( // We create a new Actor that just prints out what it processes Props(new Actor { + self ! 'lowpriority + self ! 'lowpriority + self ! 'highpriority + self ! 'pigdog + self ! 'pigdog2 + self ! 'pigdog3 + self ! 'highpriority def receive = { case x => println(x) } }).withDispatcher(new Dispatcher("foo", 5, UnboundedPriorityMailbox(gen)))) // We create a new Priority dispatcher and seed it with the priority generator - a.dispatcher.suspend(a) // Suspending the actor so it doesn't start to treat the messages before we have enqueued all of them :-) - - a ! 'lowpriority - a ! 'lowpriority - a ! 'highpriority - a ! 'pigdog - a ! 'pigdog2 - a ! 'pigdog3 - a ! 'highpriority - - a.dispatcher.resume(a) // Resuming the actor so it will start treating its messages - Prints: 'highpriority diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index a10f8ca63a..385a27f29a 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -285,7 +285,7 @@ private[akka] case class RemoteActorRef private[akka] ( remoteAddress: RemoteAddress, path: ActorPath, loader: Option[ClassLoader]) - extends ActorRef with ScalaActorRef { + extends ActorRef with ScalaActorRef with RefInternals { @volatile private var running: Boolean = true