It is with great pleasure I announce that all tests are green, I now challenge thee, Jenkins, to repeat it for me.

This commit is contained in:
Viktor Klang 2011-11-14 11:18:17 +01:00
parent d978758c8e
commit 31fbe76e06
10 changed files with 51 additions and 64 deletions

View file

@ -42,12 +42,11 @@ class TypedActorPoolSpec extends AkkaSpec {
def receive = _route def receive = _route
}, Props().withTimeout(10 seconds).withFaultHandler(faultHandler)) }, Props().withTimeout(10 seconds).withFaultHandler(faultHandler))
val results = for (i 1 to 20) yield (i, pool.sq(i, 10)) val results = for (i 1 to 100) yield (i, pool.sq(i, 0))
for ((i, r) results)
r.get must equal(i * i)
for ((i, r) results) {
val value = r.get
value must equal(i * i)
}
app.typedActor.stop(pool) app.typedActor.stop(pool)
} }
} }

View file

@ -223,10 +223,10 @@ trait Actor {
"\n\t\t'val actor = Actor.actorOf(new MyActor(..))'") "\n\t\t'val actor = Actor.actorOf(new MyActor(..))'")
if (contextStack.isEmpty) noContextError if (contextStack.isEmpty) noContextError
val context = contextStack.head val c = contextStack.head
if (context eq null) noContextError if (c eq null) noContextError
ActorCell.contextStack.set(contextStack.push(null)) ActorCell.contextStack.set(contextStack.push(null))
context c
} }
implicit def app = context.app implicit def app = context.app
@ -252,22 +252,6 @@ trait Actor {
*/ */
def loggable(self: AnyRef)(r: Receive): Receive = if (app.AkkaConfig.AddLoggingReceive) LoggingReceive(self, r) else r //TODO FIXME Shouldn't this be in a Loggable-trait? def loggable(self: AnyRef)(r: Receive): Receive = if (app.AkkaConfig.AddLoggingReceive) LoggingReceive(self, r) else r //TODO FIXME Shouldn't this be in a Loggable-trait?
/**
* Some[ActorRef] representation of the 'self' ActorRef reference.
* <p/>
* Mainly for internal use, functions as the implicit sender references when invoking
* the 'forward' function.
*/
def someSelf: Some[ActorRef with ScalaActorRef] = Some(context.self) //TODO FIXME we might not need this when we switch to sender-in-scope-always
/*
* Option[ActorRef] representation of the 'self' ActorRef reference.
* <p/>
* Mainly for internal use, functions as the implicit sender references when invoking
* one of the message send functions ('!' and '?').
*/
def optionSelf: Option[ActorRef with ScalaActorRef] = someSelf //TODO FIXME we might not need this when we switch to sender-in-scope-always
/** /**
* The 'self' field holds the ActorRef for this actor. * The 'self' field holds the ActorRef for this actor.
* <p/> * <p/>
@ -276,14 +260,14 @@ trait Actor {
* self ! message * self ! message
* </pre> * </pre>
*/ */
implicit def self = someSelf.get implicit final val self = context.self
/** /**
* The reference sender Actor of the last received message. * The reference sender Actor of the last received message.
* Is defined if the message was sent from another Actor, else None. * Is defined if the message was sent from another Actor, else None.
*/ */
@inline @inline
final def sender: ActorRef = context.sender final def sender: ActorRef = context.sender //MUST BE A VAL, TRUST ME
/** /**
* Gets the current receive timeout * Gets the current receive timeout

View file

@ -193,7 +193,7 @@ private[akka] class ActorCell(
try { try {
failedActor.preRestart(cause, if (c ne null) Some(c.message) else None) failedActor.preRestart(cause, if (c ne null) Some(c.message) else None)
} finally { } finally {
clearActorContext() clearActorFields()
currentMessage = null currentMessage = null
actor = null actor = null
} }
@ -360,7 +360,7 @@ private[akka] class ActorCell(
if (app.AkkaConfig.DebugLifecycle) app.eventStream.publish(Debug(self, "stopped")) if (app.AkkaConfig.DebugLifecycle) app.eventStream.publish(Debug(self, "stopped"))
} finally { } finally {
currentMessage = null currentMessage = null
clearActorContext() clearActorFields()
} }
} }
} }
@ -396,15 +396,17 @@ private[akka] class ActorCell(
} }
} }
final def clearActorContext(): Unit = setActorContext(null) final def clearActorFields(): Unit = setActorFields(context = null, self = null)
final def setActorContext(newContext: ActorContext) { final def setActorFields(context: ActorContext, self: ActorRef) {
@tailrec @tailrec
def lookupAndSetSelfFields(clazz: Class[_], actor: Actor, newContext: ActorContext): Boolean = { def lookupAndSetField(clazz: Class[_], actor: Actor, name: String, value: Any): Boolean = {
val success = try { val success = try {
val contextField = clazz.getDeclaredField("context") val field = clazz.getDeclaredField(name)
contextField.setAccessible(true) val was = field.isAccessible
contextField.set(actor, newContext) field.setAccessible(true)
field.set(actor, value)
field.setAccessible(was)
true true
} catch { } catch {
case e: NoSuchFieldException false case e: NoSuchFieldException false
@ -413,13 +415,14 @@ private[akka] class ActorCell(
if (success) true if (success) true
else { else {
val parent = clazz.getSuperclass val parent = clazz.getSuperclass
if (parent eq null) if (parent eq null) throw new IllegalActorStateException(toString + " is not an Actor since it have not mixed in the 'Actor' trait")
throw new IllegalActorStateException(toString + " is not an Actor since it have not mixed in the 'Actor' trait") lookupAndSetField(parent, actor, name, value)
lookupAndSetSelfFields(parent, actor, newContext)
} }
} }
val a = actor val a = actor
if (a ne null) if (a ne null) {
lookupAndSetSelfFields(a.getClass, a, newContext) lookupAndSetField(a.getClass, a, "context", context)
lookupAndSetField(a.getClass, a, "self", self)
}
} }
} }

View file

@ -73,7 +73,7 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
* actor.tell(message); * actor.tell(message);
* </pre> * </pre>
*/ */
def tell(msg: Any): Unit = this.!(msg) final def tell(msg: Any): Unit = this.!(msg)(null: ActorRef)
/** /**
* Java API. <p/> * Java API. <p/>
@ -84,7 +84,7 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
* actor.tell(message, context); * actor.tell(message, context);
* </pre> * </pre>
*/ */
def tell(msg: Any, sender: ActorRef): Unit final def tell(msg: Any, sender: ActorRef): Unit = this.!(msg)(sender)
/** /**
* Akka Java API. <p/> * Akka Java API. <p/>
@ -240,7 +240,7 @@ class LocalActorRef private[akka] (
protected[akka] def sendSystemMessage(message: SystemMessage) { underlying.dispatcher.systemDispatch(underlying, message) } protected[akka] def sendSystemMessage(message: SystemMessage) { underlying.dispatcher.systemDispatch(underlying, message) }
def tell(msg: Any, sender: ActorRef): Unit = actorCell.tell(msg, sender) def !(message: Any)(implicit sender: ActorRef = null): Unit = actorCell.tell(message, sender)
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = actorCell.provider.ask(message, this, timeout) def ?(message: Any)(implicit timeout: Timeout): Future[Any] = actorCell.provider.ask(message, this, timeout)
@ -273,7 +273,7 @@ trait ScalaActorRef { ref: ActorRef ⇒
* </pre> * </pre>
* <p/> * <p/>
*/ */
def !(message: Any)(implicit sender: ActorRef = null): Unit = ref.tell(message, sender) def !(message: Any)(implicit sender: ActorRef = null): Unit
/** /**
* Sends a message asynchronously, returning a future which may eventually hold the reply. * Sends a message asynchronously, returning a future which may eventually hold the reply.
@ -331,7 +331,7 @@ trait UnsupportedActorRef extends ActorRef with ScalaActorRef {
protected[akka] def sendSystemMessage(message: SystemMessage): Unit = () protected[akka] def sendSystemMessage(message: SystemMessage): Unit = ()
def tell(msg: Any, sender: ActorRef): Unit = () def !(message: Any)(implicit sender: ActorRef = null): Unit = ()
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = def ?(message: Any)(implicit timeout: Timeout): Future[Any] =
throw new UnsupportedOperationException("Not supported for %s".format(getClass.getName)) throw new UnsupportedOperationException("Not supported for %s".format(getClass.getName))
@ -351,17 +351,17 @@ trait MinimalActorRef extends ActorRef with ScalaActorRef {
def suspend(): Unit = () def suspend(): Unit = ()
def resume(): Unit = () def resume(): Unit = ()
protected[akka] def restart(cause: Throwable): Unit = ()
def stop(): Unit = () def stop(): Unit = ()
def isShutdown = false def isShutdown = false
protected[akka] def sendSystemMessage(message: SystemMessage): Unit = () def !(message: Any)(implicit sender: ActorRef = null): Unit = ()
def tell(msg: Any, sender: ActorRef): Unit = ()
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = def ?(message: Any)(implicit timeout: Timeout): Future[Any] =
throw new UnsupportedOperationException("Not supported for %s".format(getClass.getName)) throw new UnsupportedOperationException("Not supported for %s".format(getClass.getName))
protected[akka] def sendSystemMessage(message: SystemMessage): Unit = ()
protected[akka] def restart(cause: Throwable): Unit = ()
} }
case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef) case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef)
@ -387,9 +387,9 @@ class DeadLetterActorRef(val app: ActorSystem) extends MinimalActorRef {
override def isShutdown(): Boolean = true override def isShutdown(): Boolean = true
override def tell(msg: Any, sender: ActorRef): Unit = msg match { override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match {
case d: DeadLetter app.eventStream.publish(d) case d: DeadLetter app.eventStream.publish(d)
case _ app.eventStream.publish(DeadLetter(msg, sender, this)) case _ app.eventStream.publish(DeadLetter(message, sender, this))
} }
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = { override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
@ -417,7 +417,7 @@ abstract class AskActorRef(protected val app: ActorSystem)(timeout: Timeout = ap
protected def whenDone(): Unit protected def whenDone(): Unit
override def tell(msg: Any, sender: ActorRef): Unit = msg match { override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match {
case Status.Success(r) result.completeWithResult(r) case Status.Success(r) result.completeWithResult(r)
case Status.Failure(f) result.completeWithException(f) case Status.Failure(f) result.completeWithException(f)
case other result.completeWithResult(other) case other result.completeWithResult(other)

View file

@ -149,10 +149,10 @@ class LocalActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
def isShutdown = stopped def isShutdown = stopped
override def tell(msg: Any, sender: ActorRef): Unit = msg match { override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match {
case Failed(ex) sender.stop() case Failed(ex) sender.stop()
case ChildTerminated terminationFuture.completeWithResult(ActorSystem.Stopped) case ChildTerminated terminationFuture.completeWithResult(ActorSystem.Stopped)
case _ log.error(this + " received unexpected message " + msg) case _ log.error(this + " received unexpected message " + message)
} }
protected[akka] override def sendSystemMessage(message: SystemMessage) { protected[akka] override def sendSystemMessage(message: SystemMessage) {
@ -276,11 +276,12 @@ class LocalActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
private[akka] def createDeathWatch(): DeathWatch = new LocalDeathWatch private[akka] def createDeathWatch(): DeathWatch = new LocalDeathWatch
private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = { private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = {
import akka.dispatch.{ Future, Promise, DefaultPromise } import akka.dispatch.DefaultPromise
(if (within == null) app.AkkaConfig.ActorTimeout else within) match { (if (within == null) app.AkkaConfig.ActorTimeout else within) match {
case t if t.duration.length <= 0 new DefaultPromise[Any](0)(app.dispatcher) //Abort early if nonsensical timeout case t if t.duration.length <= 0
new DefaultPromise[Any](0)(app.dispatcher) //Abort early if nonsensical timeout
case t case t
val a = new AskActorRef(app)(timeout = t) { def whenDone() = actors.remove(this) } val a = new AskActorRef(app)(timeout = t) { def whenDone() = actors.remove(this.path.toString) }
assert(actors.putIfAbsent(a.path.toString, a) eq null) //If this fails, we're in deep trouble assert(actors.putIfAbsent(a.path.toString, a) eq null) //If this fails, we're in deep trouble
recipient.tell(message, a) recipient.tell(message, a)
a.result a.result

View file

@ -334,9 +334,9 @@ class TypedActor(val app: ActorSystem) {
try { try {
if (m.isOneWay) m(me) if (m.isOneWay) m(me)
else { else {
val s = sender
try { try {
if (m.returnsFuture_?) { if (m.returnsFuture_?) {
val s = sender
m(me).asInstanceOf[Future[Any]] onComplete { m(me).asInstanceOf[Future[Any]] onComplete {
_.value.get match { _.value.get match {
case Left(f) s ! Status.Failure(f) case Left(f) s ! Status.Failure(f)
@ -344,10 +344,10 @@ class TypedActor(val app: ActorSystem) {
} }
} }
} else { } else {
s ! m(me) sender ! m(me)
} }
} catch { } catch {
case e: Exception s ! Status.Failure(e) case e: Exception sender ! Status.Failure(e)
} }
} }
} finally { } finally {

View file

@ -411,7 +411,7 @@ object Future {
try { try {
next.apply() next.apply()
} catch { } catch {
case e // TODO FIXME: Throwable or Exception, log or do what? case e e.printStackTrace() //TODO FIXME strategy for handling exceptions in callbacks
} }
} }
} finally { _taskStack set None } } finally { _taskStack set None }

View file

@ -369,7 +369,7 @@ object Logging {
val path: ActorPath = null // pathless val path: ActorPath = null // pathless
val address: String = name val address: String = name
override val toString = "StandardOutLogger" override val toString = "StandardOutLogger"
override def tell(obj: Any, sender: ActorRef) { print(obj) } override def !(message: Any)(implicit sender: ActorRef = null): Unit = print(message)
} }
val StandardOutLogger = new StandardOutLogger val StandardOutLogger = new StandardOutLogger
val StandardOutLoggerName = StandardOutLogger.getClass.getName val StandardOutLoggerName = StandardOutLogger.getClass.getName

View file

@ -158,7 +158,7 @@ object Routing {
abstract private[akka] class AbstractRoutedActorRef(val app: ActorSystem, val props: RoutedProps) extends UnsupportedActorRef { abstract private[akka] class AbstractRoutedActorRef(val app: ActorSystem, val props: RoutedProps) extends UnsupportedActorRef {
val router = props.routerFactory() val router = props.routerFactory()
override def tell(message: Any, sender: ActorRef) = router.route(message)(sender) override def !(message: Any)(implicit sender: ActorRef = null): Unit = router.route(message)(sender)
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = router.route(message, timeout) override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = router.route(message, timeout)
} }

View file

@ -271,9 +271,9 @@ private[akka] case class RemoteActorRef private[akka] (
protected[akka] def sendSystemMessage(message: SystemMessage): Unit = unsupported protected[akka] def sendSystemMessage(message: SystemMessage): Unit = unsupported
def tell(message: Any, sender: ActorRef): Unit = remote.send(message, Option(sender), remoteAddress, this, loader) override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), remoteAddress, this, loader)
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = remote.app.provider.ask(message, this, timeout) override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = remote.app.provider.ask(message, this, timeout)
def suspend(): Unit = () def suspend(): Unit = ()