Cleaned up ActorCell, removed all Java-unfriendly methods

This commit is contained in:
Jonas Bonér 2011-12-13 11:11:21 +01:00
parent 5a79a9104a
commit 86a5114d79
8 changed files with 56 additions and 57 deletions

View file

@ -18,7 +18,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
val timeoutLatch = TestLatch()
val timeoutActor = system.actorOf(new Actor {
context.receiveTimeout = Some(500 milliseconds)
context.setReceiveTimeout(500 milliseconds)
protected def receive = {
case ReceiveTimeout timeoutLatch.open
@ -34,7 +34,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
case object Tick
val timeoutActor = system.actorOf(new Actor {
context.receiveTimeout = Some(500 milliseconds)
context.setReceiveTimeout(500 milliseconds)
protected def receive = {
case Tick ()
@ -54,14 +54,14 @@ class ReceiveTimeoutSpec extends AkkaSpec {
case object Tick
val timeoutActor = system.actorOf(new Actor {
context.receiveTimeout = Some(500 milliseconds)
context.setReceiveTimeout(500 milliseconds)
protected def receive = {
case Tick ()
case ReceiveTimeout
count.incrementAndGet
timeoutLatch.open
context.receiveTimeout = None
context.resetReceiveTimeout()
}
})

View file

@ -188,12 +188,12 @@ trait Actor {
def noContextError =
throw new ActorInitializationException(
"\n\tYou cannot create an instance of " + getClass.getName + " explicitly using the constructor (new)." +
"\n\tYou cannot create an instance of [" + getClass.getName + "] explicitly using the constructor (new)." +
"\n\tYou have to use one of the factory methods to create a new actor. Either use:" +
"\n\t\t'val actor = context.actorOf[MyActor]' (to create a supervised child actor from within an actor), or" +
"\n\t\t'val actor = system.actorOf(new MyActor(..))' (to create a top level actor from the ActorSystem), or" +
"\n\t\t'val actor = context.actorOf[MyActor]' (to create a supervised child actor from within an actor), or" +
"\n\t\t'val actor = system.actorOf(new MyActor(..))' (to create a top level actor from the ActorSystem)")
"\n\t\t'val actor = context.actorOf(Props[MyActor])' (to create a supervised child actor from within an actor), or" +
"\n\t\t'val actor = system.actorOf(Props(new MyActor(..)))' (to create a top level actor from the ActorSystem), or" +
"\n\t\t'val actor = context.actorOf(Props[MyActor])' (to create a supervised child actor from within an actor), or" +
"\n\t\t'val actor = system.actorOf(Props(new MyActor(..)))' (to create a top level actor from the ActorSystem)")
if (contextStack.isEmpty) noContextError
val c = contextStack.head

View file

@ -59,7 +59,12 @@ trait ActorContext extends ActorRefFactory {
* When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
* 1 millisecond is the minimum supported timeout.
*/
def receiveTimeout_=(timeout: Option[Duration]): Unit
def setReceiveTimeout(timeout: Duration): Unit
/**
* Resets the current receive timeout.
*/
def resetReceiveTimeout(): Unit
/**
* Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler.
@ -68,19 +73,29 @@ trait ActorContext extends ActorRefFactory {
*/
def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit
def hotswap: Stack[PartialFunction[Any, Unit]]
/**
* Reverts the Actor behavior to the previous one in the hotswap stack.
*/
def unbecome(): Unit
/**
* Returns the current message envelope.
*/
def currentMessage: Envelope
def currentMessage_=(invocation: Envelope): Unit
/**
* Returns a stack with the hotswapped behaviors (as Scala PartialFunction).
*/
def hotswap: Stack[PartialFunction[Any, Unit]]
/**
* Returns the sender 'ActorRef' of the current message.
*/
def sender: ActorRef
/**
* Returns all supervised children.
*/
def children: Iterable[ActorRef]
/**
@ -99,16 +114,19 @@ trait ActorContext extends ActorRefFactory {
*/
implicit def system: ActorSystem
/**
* Returns the supervising parent ActorRef.
*/
def parent: ActorRef
/**
* Registers this actor as a Monitor for the provided ActorRef
* Registers this actor as a Monitor for the provided ActorRef.
* @return the provided ActorRef
*/
def watch(subject: ActorRef): ActorRef
/**
* Unregisters this actor as Monitor for the provided ActorRef
* Unregisters this actor as Monitor for the provided ActorRef.
* @return the provided ActorRef
*/
def unwatch(subject: ActorRef): ActorRef
@ -118,25 +136,13 @@ trait ActorContext extends ActorRefFactory {
}
trait UntypedActorContext extends ActorContext {
/**
* Returns an unmodifiable Java Collection containing the linked actors,
* please note that the backing map is thread-safe but not immutable
*/
def getChildren(): java.lang.Iterable[ActorRef]
/**
* Gets the current receive timeout
* When specified, the receive method should be able to handle a 'ReceiveTimeout' message.
*/
def getReceiveTimeout: Option[Duration]
/**
* Defines the default timeout for an initial receive invocation.
* When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
* 1 millisecond is the minimum supported timeout.
*/
def setReceiveTimeout(timeout: Duration): Unit
/**
* Changes the Actor's behavior to become the new 'Procedure' handler.
* Puts the behavior on top of the hotswap stack.
@ -190,7 +196,9 @@ private[akka] final class ActorCell(
override def receiveTimeout: Option[Duration] = if (receiveTimeoutData._1 > 0) Some(Duration(receiveTimeoutData._1, MILLISECONDS)) else None
override def receiveTimeout_=(timeout: Option[Duration]): Unit = {
override def setReceiveTimeout(timeout: Duration): Unit = setReceiveTimeout(Some(timeout))
def setReceiveTimeout(timeout: Option[Duration]): Unit = {
val timeoutMs = timeout match {
case None -1L
case Some(duration)
@ -203,22 +211,14 @@ private[akka] final class ActorCell(
receiveTimeoutData = (timeoutMs, receiveTimeoutData._2)
}
override def resetReceiveTimeout(): Unit = setReceiveTimeout(None)
/**
* In milliseconds
*/
var receiveTimeoutData: (Long, Cancellable) =
if (_receiveTimeout.isDefined) (_receiveTimeout.get.toMillis, emptyCancellable) else emptyReceiveTimeoutData
/**
* UntypedActorContext impl
*/
def getReceiveTimeout: Option[Duration] = receiveTimeout
/**
* UntypedActorContext impl
*/
def setReceiveTimeout(timeout: Duration): Unit = receiveTimeout = Some(timeout)
var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs
private def _actorOf(props: Props, name: String): ActorRef = {
@ -391,7 +391,7 @@ private[akka] final class ActorCell(
def resume(): Unit = dispatcher resume this
def terminate() {
receiveTimeout = None
setReceiveTimeout(None)
cancelReceiveTimeout
val c = children

View file

@ -455,7 +455,7 @@ class AskActorRef(
}
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] =
new KeptPromise[Any](Left(new UnsupportedOperationException("Ask/? is not supported for %s".format(getClass.getName))))(dispatcher)
new KeptPromise[Any](Left(new UnsupportedOperationException("Ask/? is not supported for [%s]".format(getClass.getName))))(dispatcher)
override def isTerminated = result.isCompleted || result.isExpired

View file

@ -388,14 +388,14 @@ class LocalActorRefProvider(
override def !(message: Any)(implicit sender: ActorRef = null): Unit = stopped.ifOff(message match {
case Failed(ex) if sender ne null causeOfTermination = Some(ex); sender.stop()
case _ log.error(this + " received unexpected message " + message)
case _ log.error(this + " received unexpected message [" + message + "]")
})
override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff {
message match {
case Supervise(child) // TODO register child in some map to keep track of it and enable shutdown after all dead
case ChildTerminated(child) stop()
case _ log.error(this + " received unexpected system message " + message)
case _ log.error(this + " received unexpected system message [" + message + "]")
}
}
}
@ -538,7 +538,7 @@ class LocalActorRefProvider(
actorOf(system, RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), supervisor, path.name)
case unknown throw new Exception("Don't know how to create this actor ref! Why? Got: " + unknown)
case unknown throw new Exception("Don't know how to create this Actor - cause [" + unknown + "]")
}
}

View file

@ -104,8 +104,7 @@ object ActorSystem {
val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel")
if (ConfigVersion != Version)
throw new ConfigurationException("Akka JAR version [" + Version +
"] does not match the provided config version [" + ConfigVersion + "]")
throw new ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]")
override def toString: String = config.root.render
}
@ -327,7 +326,7 @@ abstract class ActorSystem extends ActorRefFactory {
class ActorSystemImpl(val name: String, applicationConfig: Config) extends ActorSystem {
if (!name.matches("""^\w+$"""))
throw new IllegalArgumentException("invalid ActorSystem name '" + name + "', must contain only word characters (i.e. [a-zA-Z_0-9])")
throw new IllegalArgumentException("invalid ActorSystem name [" + name + "], must contain only word characters (i.e. [a-zA-Z_0-9])")
import ActorSystem._
@ -464,8 +463,8 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
}
/*
* This is called after the last actor has signaled its termination, i.e.
* after the last dispatcher has had its chance to schedule its shutdown
* This is called after the last actor has signaled its termination, i.e.
* after the last dispatcher has had its chance to schedule its shutdown
* action.
*/
protected def stopScheduler(): Unit = scheduler match {
@ -492,7 +491,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
extensions.putIfAbsent(ext, inProcessOfRegistration) match { // Signal that registration is in process
case null try { // Signal was successfully sent
ext.createExtension(this) match { // Create and initialize the extension
case null throw new IllegalStateException("Extension instance created as null for Extension: " + ext)
case null throw new IllegalStateException("Extension instance created as 'null' for extension [" + ext + "]")
case instance
extensions.replace(ext, inProcessOfRegistration, instance) //Replace our in process signal with the initialized extension
instance //Profit!
@ -511,7 +510,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
}
def extension[T <: Extension](ext: ExtensionId[T]): T = findExtension(ext) match {
case null throw new IllegalArgumentException("Trying to get non-registered extension " + ext)
case null throw new IllegalArgumentException("Trying to get non-registered extension [" + ext + "]")
case some some.asInstanceOf[T]
}
@ -524,8 +523,8 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
getObjectFor[AnyRef](fqcn).fold(_ createInstance[AnyRef](fqcn, noParams, noArgs), Right(_)) match {
case Right(p: ExtensionIdProvider) registerExtension(p.lookup());
case Right(p: ExtensionId[_]) registerExtension(p);
case Right(other) log.error("'{}' is not an ExtensionIdProvider or ExtensionId, skipping...", fqcn)
case Left(problem) log.error(problem, "While trying to load extension '{}', skipping...", fqcn)
case Right(other) log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn)
case Left(problem) log.error(problem, "While trying to load extension [{}], skipping...", fqcn)
}
}

View file

@ -193,7 +193,7 @@ trait IO {
private def run() {
_next match {
case ByteStringLength(continuation, handle, message, waitingFor)
context.currentMessage = message
context.asInstanceOf[ActorCell].currentMessage = message
val st = state(handle)
if (st.readBytes.length >= waitingFor) {
val bytes = st.readBytes.take(waitingFor) //.compact
@ -202,7 +202,7 @@ trait IO {
run()
}
case bsd @ ByteStringDelimited(continuation, handle, message, delimiter, inclusive, scanned)
context.currentMessage = message
context.asInstanceOf[ActorCell].currentMessage = message
val st = state(handle)
val idx = st.readBytes.indexOfSlice(delimiter, scanned)
if (idx >= 0) {
@ -215,7 +215,7 @@ trait IO {
_next = bsd.copy(scanned = math.min(idx - delimiter.length, 0))
}
case ByteStringAny(continuation, handle, message)
context.currentMessage = message
context.asInstanceOf[ActorCell].currentMessage = message
val st = state(handle)
if (st.readBytes.length > 0) {
val bytes = st.readBytes //.compact

View file

@ -221,7 +221,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
import akka.actor.ReceiveTimeout
import akka.util.duration._
class MyActor extends Actor {
context.receiveTimeout = Some(30 seconds)
context.setReceiveTimeout(30 milliseconds)
def receive = {
case "Hello" //...
case ReceiveTimeout throw new RuntimeException("received timeout")