From 86a5114d791eee65ba28a6f3dcf6012eaa5014f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Tue, 13 Dec 2011 11:11:21 +0100 Subject: [PATCH] Cleaned up ActorCell, removed all Java-unfriendly methods --- .../scala/akka/actor/ReceiveTimeoutSpec.scala | 8 +-- .../src/main/scala/akka/actor/Actor.scala | 10 +-- .../src/main/scala/akka/actor/ActorCell.scala | 62 +++++++++---------- .../src/main/scala/akka/actor/ActorRef.scala | 2 +- .../scala/akka/actor/ActorRefProvider.scala | 6 +- .../main/scala/akka/actor/ActorSystem.scala | 17 +++-- akka-actor/src/main/scala/akka/actor/IO.scala | 6 +- akka-docs/scala/code/ActorDocSpec.scala | 2 +- 8 files changed, 56 insertions(+), 57 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala index bccf4da5c7..961ddd2736 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala @@ -18,7 +18,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { val timeoutLatch = TestLatch() val timeoutActor = system.actorOf(new Actor { - context.receiveTimeout = Some(500 milliseconds) + context.setReceiveTimeout(500 milliseconds) protected def receive = { case ReceiveTimeout ⇒ timeoutLatch.open @@ -34,7 +34,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { case object Tick val timeoutActor = system.actorOf(new Actor { - context.receiveTimeout = Some(500 milliseconds) + context.setReceiveTimeout(500 milliseconds) protected def receive = { case Tick ⇒ () @@ -54,14 +54,14 @@ class ReceiveTimeoutSpec extends AkkaSpec { case object Tick val timeoutActor = system.actorOf(new Actor { - context.receiveTimeout = Some(500 milliseconds) + context.setReceiveTimeout(500 milliseconds) protected def receive = { case Tick ⇒ () case ReceiveTimeout ⇒ count.incrementAndGet timeoutLatch.open - context.receiveTimeout = None + context.resetReceiveTimeout() } }) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index df1bb0e6a5..2c7944a69e 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -188,12 +188,12 @@ trait Actor { def noContextError = throw new ActorInitializationException( - "\n\tYou cannot create an instance of " + getClass.getName + " explicitly using the constructor (new)." + + "\n\tYou cannot create an instance of [" + getClass.getName + "] explicitly using the constructor (new)." + "\n\tYou have to use one of the factory methods to create a new actor. Either use:" + - "\n\t\t'val actor = context.actorOf[MyActor]' (to create a supervised child actor from within an actor), or" + - "\n\t\t'val actor = system.actorOf(new MyActor(..))' (to create a top level actor from the ActorSystem), or" + - "\n\t\t'val actor = context.actorOf[MyActor]' (to create a supervised child actor from within an actor), or" + - "\n\t\t'val actor = system.actorOf(new MyActor(..))' (to create a top level actor from the ActorSystem)") + "\n\t\t'val actor = context.actorOf(Props[MyActor])' (to create a supervised child actor from within an actor), or" + + "\n\t\t'val actor = system.actorOf(Props(new MyActor(..)))' (to create a top level actor from the ActorSystem), or" + + "\n\t\t'val actor = context.actorOf(Props[MyActor])' (to create a supervised child actor from within an actor), or" + + "\n\t\t'val actor = system.actorOf(Props(new MyActor(..)))' (to create a top level actor from the ActorSystem)") if (contextStack.isEmpty) noContextError val c = contextStack.head diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index c17924c56c..1dd7f4fe59 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -59,7 +59,12 @@ trait ActorContext extends ActorRefFactory { * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. * 1 millisecond is the minimum supported timeout. */ - def receiveTimeout_=(timeout: Option[Duration]): Unit + def setReceiveTimeout(timeout: Duration): Unit + + /** + * Resets the current receive timeout. + */ + def resetReceiveTimeout(): Unit /** * Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler. @@ -68,19 +73,29 @@ trait ActorContext extends ActorRefFactory { */ def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit - def hotswap: Stack[PartialFunction[Any, Unit]] - /** * Reverts the Actor behavior to the previous one in the hotswap stack. */ def unbecome(): Unit + /** + * Returns the current message envelope. + */ def currentMessage: Envelope - def currentMessage_=(invocation: Envelope): Unit + /** + * Returns a stack with the hotswapped behaviors (as Scala PartialFunction). + */ + def hotswap: Stack[PartialFunction[Any, Unit]] + /** + * Returns the sender 'ActorRef' of the current message. + */ def sender: ActorRef + /** + * Returns all supervised children. + */ def children: Iterable[ActorRef] /** @@ -99,16 +114,19 @@ trait ActorContext extends ActorRefFactory { */ implicit def system: ActorSystem + /** + * Returns the supervising parent ActorRef. + */ def parent: ActorRef /** - * Registers this actor as a Monitor for the provided ActorRef + * Registers this actor as a Monitor for the provided ActorRef. * @return the provided ActorRef */ def watch(subject: ActorRef): ActorRef /** - * Unregisters this actor as Monitor for the provided ActorRef + * Unregisters this actor as Monitor for the provided ActorRef. * @return the provided ActorRef */ def unwatch(subject: ActorRef): ActorRef @@ -118,25 +136,13 @@ trait ActorContext extends ActorRefFactory { } trait UntypedActorContext extends ActorContext { + /** * Returns an unmodifiable Java Collection containing the linked actors, * please note that the backing map is thread-safe but not immutable */ def getChildren(): java.lang.Iterable[ActorRef] - /** - * Gets the current receive timeout - * When specified, the receive method should be able to handle a 'ReceiveTimeout' message. - */ - def getReceiveTimeout: Option[Duration] - - /** - * Defines the default timeout for an initial receive invocation. - * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. - * 1 millisecond is the minimum supported timeout. - */ - def setReceiveTimeout(timeout: Duration): Unit - /** * Changes the Actor's behavior to become the new 'Procedure' handler. * Puts the behavior on top of the hotswap stack. @@ -190,7 +196,9 @@ private[akka] final class ActorCell( override def receiveTimeout: Option[Duration] = if (receiveTimeoutData._1 > 0) Some(Duration(receiveTimeoutData._1, MILLISECONDS)) else None - override def receiveTimeout_=(timeout: Option[Duration]): Unit = { + override def setReceiveTimeout(timeout: Duration): Unit = setReceiveTimeout(Some(timeout)) + + def setReceiveTimeout(timeout: Option[Duration]): Unit = { val timeoutMs = timeout match { case None ⇒ -1L case Some(duration) ⇒ @@ -203,22 +211,14 @@ private[akka] final class ActorCell( receiveTimeoutData = (timeoutMs, receiveTimeoutData._2) } + override def resetReceiveTimeout(): Unit = setReceiveTimeout(None) + /** * In milliseconds */ var receiveTimeoutData: (Long, Cancellable) = if (_receiveTimeout.isDefined) (_receiveTimeout.get.toMillis, emptyCancellable) else emptyReceiveTimeoutData - /** - * UntypedActorContext impl - */ - def getReceiveTimeout: Option[Duration] = receiveTimeout - - /** - * UntypedActorContext impl - */ - def setReceiveTimeout(timeout: Duration): Unit = receiveTimeout = Some(timeout) - var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs private def _actorOf(props: Props, name: String): ActorRef = { @@ -391,7 +391,7 @@ private[akka] final class ActorCell( def resume(): Unit = dispatcher resume this def terminate() { - receiveTimeout = None + setReceiveTimeout(None) cancelReceiveTimeout val c = children diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index a5253c440b..cea2c1cea4 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -455,7 +455,7 @@ class AskActorRef( } override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = - new KeptPromise[Any](Left(new UnsupportedOperationException("Ask/? is not supported for %s".format(getClass.getName))))(dispatcher) + new KeptPromise[Any](Left(new UnsupportedOperationException("Ask/? is not supported for [%s]".format(getClass.getName))))(dispatcher) override def isTerminated = result.isCompleted || result.isExpired diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index d68a1349f0..44c9eb258e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -388,14 +388,14 @@ class LocalActorRefProvider( override def !(message: Any)(implicit sender: ActorRef = null): Unit = stopped.ifOff(message match { case Failed(ex) if sender ne null ⇒ causeOfTermination = Some(ex); sender.stop() - case _ ⇒ log.error(this + " received unexpected message " + message) + case _ ⇒ log.error(this + " received unexpected message [" + message + "]") }) override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff { message match { case Supervise(child) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead case ChildTerminated(child) ⇒ stop() - case _ ⇒ log.error(this + " received unexpected system message " + message) + case _ ⇒ log.error(this + " received unexpected system message [" + message + "]") } } } @@ -538,7 +538,7 @@ class LocalActorRefProvider( actorOf(system, RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), supervisor, path.name) - case unknown ⇒ throw new Exception("Don't know how to create this actor ref! Why? Got: " + unknown) + case unknown ⇒ throw new Exception("Don't know how to create this Actor - cause [" + unknown + "]") } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 31caf6083b..bedc25c093 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -104,8 +104,7 @@ object ActorSystem { val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel") if (ConfigVersion != Version) - throw new ConfigurationException("Akka JAR version [" + Version + - "] does not match the provided config version [" + ConfigVersion + "]") + throw new ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]") override def toString: String = config.root.render } @@ -327,7 +326,7 @@ abstract class ActorSystem extends ActorRefFactory { class ActorSystemImpl(val name: String, applicationConfig: Config) extends ActorSystem { if (!name.matches("""^\w+$""")) - throw new IllegalArgumentException("invalid ActorSystem name '" + name + "', must contain only word characters (i.e. [a-zA-Z_0-9])") + throw new IllegalArgumentException("invalid ActorSystem name [" + name + "], must contain only word characters (i.e. [a-zA-Z_0-9])") import ActorSystem._ @@ -464,8 +463,8 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor } /* - * This is called after the last actor has signaled its termination, i.e. - * after the last dispatcher has had its chance to schedule its shutdown + * This is called after the last actor has signaled its termination, i.e. + * after the last dispatcher has had its chance to schedule its shutdown * action. */ protected def stopScheduler(): Unit = scheduler match { @@ -492,7 +491,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor extensions.putIfAbsent(ext, inProcessOfRegistration) match { // Signal that registration is in process case null ⇒ try { // Signal was successfully sent ext.createExtension(this) match { // Create and initialize the extension - case null ⇒ throw new IllegalStateException("Extension instance created as null for Extension: " + ext) + case null ⇒ throw new IllegalStateException("Extension instance created as 'null' for extension [" + ext + "]") case instance ⇒ extensions.replace(ext, inProcessOfRegistration, instance) //Replace our in process signal with the initialized extension instance //Profit! @@ -511,7 +510,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor } def extension[T <: Extension](ext: ExtensionId[T]): T = findExtension(ext) match { - case null ⇒ throw new IllegalArgumentException("Trying to get non-registered extension " + ext) + case null ⇒ throw new IllegalArgumentException("Trying to get non-registered extension [" + ext + "]") case some ⇒ some.asInstanceOf[T] } @@ -524,8 +523,8 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor getObjectFor[AnyRef](fqcn).fold(_ ⇒ createInstance[AnyRef](fqcn, noParams, noArgs), Right(_)) match { case Right(p: ExtensionIdProvider) ⇒ registerExtension(p.lookup()); case Right(p: ExtensionId[_]) ⇒ registerExtension(p); - case Right(other) ⇒ log.error("'{}' is not an ExtensionIdProvider or ExtensionId, skipping...", fqcn) - case Left(problem) ⇒ log.error(problem, "While trying to load extension '{}', skipping...", fqcn) + case Right(other) ⇒ log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn) + case Left(problem) ⇒ log.error(problem, "While trying to load extension [{}], skipping...", fqcn) } } diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index 1a92679c4b..1551eef2ec 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -193,7 +193,7 @@ trait IO { private def run() { _next match { case ByteStringLength(continuation, handle, message, waitingFor) ⇒ - context.currentMessage = message + context.asInstanceOf[ActorCell].currentMessage = message val st = state(handle) if (st.readBytes.length >= waitingFor) { val bytes = st.readBytes.take(waitingFor) //.compact @@ -202,7 +202,7 @@ trait IO { run() } case bsd @ ByteStringDelimited(continuation, handle, message, delimiter, inclusive, scanned) ⇒ - context.currentMessage = message + context.asInstanceOf[ActorCell].currentMessage = message val st = state(handle) val idx = st.readBytes.indexOfSlice(delimiter, scanned) if (idx >= 0) { @@ -215,7 +215,7 @@ trait IO { _next = bsd.copy(scanned = math.min(idx - delimiter.length, 0)) } case ByteStringAny(continuation, handle, message) ⇒ - context.currentMessage = message + context.asInstanceOf[ActorCell].currentMessage = message val st = state(handle) if (st.readBytes.length > 0) { val bytes = st.readBytes //.compact diff --git a/akka-docs/scala/code/ActorDocSpec.scala b/akka-docs/scala/code/ActorDocSpec.scala index 744f439c91..8ee7dac4d9 100644 --- a/akka-docs/scala/code/ActorDocSpec.scala +++ b/akka-docs/scala/code/ActorDocSpec.scala @@ -221,7 +221,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { import akka.actor.ReceiveTimeout import akka.util.duration._ class MyActor extends Actor { - context.receiveTimeout = Some(30 seconds) + context.setReceiveTimeout(30 milliseconds) def receive = { case "Hello" ⇒ //... case ReceiveTimeout ⇒ throw new RuntimeException("received timeout")