diff --git a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala index 547ffa12e9..4b95091053 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala @@ -42,12 +42,11 @@ class TypedActorPoolSpec extends AkkaSpec { def receive = _route }, Props().withTimeout(10 seconds).withFaultHandler(faultHandler)) - val results = for (i ← 1 to 20) yield (i, pool.sq(i, 10)) + val results = for (i ← 1 to 100) yield (i, pool.sq(i, 0)) + + for ((i, r) ← results) + r.get must equal(i * i) - for ((i, r) ← results) { - val value = r.get - value must equal(i * i) - } app.typedActor.stop(pool) } } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 6820d12e8a..f15c8c89aa 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -223,10 +223,10 @@ trait Actor { "\n\t\t'val actor = Actor.actorOf(new MyActor(..))'") if (contextStack.isEmpty) noContextError - val context = contextStack.head - if (context eq null) noContextError + val c = contextStack.head + if (c eq null) noContextError ActorCell.contextStack.set(contextStack.push(null)) - context + c } implicit def app = context.app @@ -252,22 +252,6 @@ trait Actor { */ def loggable(self: AnyRef)(r: Receive): Receive = if (app.AkkaConfig.AddLoggingReceive) LoggingReceive(self, r) else r //TODO FIXME Shouldn't this be in a Loggable-trait? - /** - * Some[ActorRef] representation of the 'self' ActorRef reference. - *

- * Mainly for internal use, functions as the implicit sender references when invoking - * the 'forward' function. - */ - def someSelf: Some[ActorRef with ScalaActorRef] = Some(context.self) //TODO FIXME we might not need this when we switch to sender-in-scope-always - - /* - * Option[ActorRef] representation of the 'self' ActorRef reference. - *

- * Mainly for internal use, functions as the implicit sender references when invoking - * one of the message send functions ('!' and '?'). - */ - def optionSelf: Option[ActorRef with ScalaActorRef] = someSelf //TODO FIXME we might not need this when we switch to sender-in-scope-always - /** * The 'self' field holds the ActorRef for this actor. *

@@ -276,14 +260,14 @@ trait Actor { * self ! message * */ - implicit def self = someSelf.get + implicit final val self = context.self /** * The reference sender Actor of the last received message. * Is defined if the message was sent from another Actor, else None. */ @inline - final def sender: ActorRef = context.sender + final def sender: ActorRef = context.sender //MUST BE A VAL, TRUST ME /** * Gets the current receive timeout diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 4df2834142..2186dc6e1a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -193,7 +193,7 @@ private[akka] class ActorCell( try { failedActor.preRestart(cause, if (c ne null) Some(c.message) else None) } finally { - clearActorContext() + clearActorFields() currentMessage = null actor = null } @@ -360,7 +360,7 @@ private[akka] class ActorCell( if (app.AkkaConfig.DebugLifecycle) app.eventStream.publish(Debug(self, "stopped")) } finally { currentMessage = null - clearActorContext() + clearActorFields() } } } @@ -396,15 +396,17 @@ private[akka] class ActorCell( } } - final def clearActorContext(): Unit = setActorContext(null) + final def clearActorFields(): Unit = setActorFields(context = null, self = null) - final def setActorContext(newContext: ActorContext) { + final def setActorFields(context: ActorContext, self: ActorRef) { @tailrec - def lookupAndSetSelfFields(clazz: Class[_], actor: Actor, newContext: ActorContext): Boolean = { + def lookupAndSetField(clazz: Class[_], actor: Actor, name: String, value: Any): Boolean = { val success = try { - val contextField = clazz.getDeclaredField("context") - contextField.setAccessible(true) - contextField.set(actor, newContext) + val field = clazz.getDeclaredField(name) + val was = field.isAccessible + field.setAccessible(true) + field.set(actor, value) + field.setAccessible(was) true } catch { case e: NoSuchFieldException ⇒ false @@ -413,13 +415,14 @@ private[akka] class ActorCell( if (success) true else { val parent = clazz.getSuperclass - if (parent eq null) - throw new IllegalActorStateException(toString + " is not an Actor since it have not mixed in the 'Actor' trait") - lookupAndSetSelfFields(parent, actor, newContext) + if (parent eq null) throw new IllegalActorStateException(toString + " is not an Actor since it have not mixed in the 'Actor' trait") + lookupAndSetField(parent, actor, name, value) } } val a = actor - if (a ne null) - lookupAndSetSelfFields(a.getClass, a, newContext) + if (a ne null) { + lookupAndSetField(a.getClass, a, "context", context) + lookupAndSetField(a.getClass, a, "self", self) + } } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 26a3ef7eed..5f4ff06f37 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -73,7 +73,7 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable * actor.tell(message); * */ - def tell(msg: Any): Unit = this.!(msg) + final def tell(msg: Any): Unit = this.!(msg)(null: ActorRef) /** * Java API.

@@ -84,7 +84,7 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable * actor.tell(message, context); * */ - def tell(msg: Any, sender: ActorRef): Unit + final def tell(msg: Any, sender: ActorRef): Unit = this.!(msg)(sender) /** * Akka Java API.

@@ -240,7 +240,7 @@ class LocalActorRef private[akka] ( protected[akka] def sendSystemMessage(message: SystemMessage) { underlying.dispatcher.systemDispatch(underlying, message) } - def tell(msg: Any, sender: ActorRef): Unit = actorCell.tell(msg, sender) + def !(message: Any)(implicit sender: ActorRef = null): Unit = actorCell.tell(message, sender) def ?(message: Any)(implicit timeout: Timeout): Future[Any] = actorCell.provider.ask(message, this, timeout) @@ -273,7 +273,7 @@ trait ScalaActorRef { ref: ActorRef ⇒ * *

*/ - def !(message: Any)(implicit sender: ActorRef = null): Unit = ref.tell(message, sender) + def !(message: Any)(implicit sender: ActorRef = null): Unit /** * Sends a message asynchronously, returning a future which may eventually hold the reply. @@ -331,7 +331,7 @@ trait UnsupportedActorRef extends ActorRef with ScalaActorRef { protected[akka] def sendSystemMessage(message: SystemMessage): Unit = () - def tell(msg: Any, sender: ActorRef): Unit = () + def !(message: Any)(implicit sender: ActorRef = null): Unit = () def ?(message: Any)(implicit timeout: Timeout): Future[Any] = throw new UnsupportedOperationException("Not supported for %s".format(getClass.getName)) @@ -351,17 +351,17 @@ trait MinimalActorRef extends ActorRef with ScalaActorRef { def suspend(): Unit = () def resume(): Unit = () - protected[akka] def restart(cause: Throwable): Unit = () def stop(): Unit = () def isShutdown = false - protected[akka] def sendSystemMessage(message: SystemMessage): Unit = () - - def tell(msg: Any, sender: ActorRef): Unit = () + def !(message: Any)(implicit sender: ActorRef = null): Unit = () def ?(message: Any)(implicit timeout: Timeout): Future[Any] = throw new UnsupportedOperationException("Not supported for %s".format(getClass.getName)) + + protected[akka] def sendSystemMessage(message: SystemMessage): Unit = () + protected[akka] def restart(cause: Throwable): Unit = () } case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef) @@ -387,9 +387,9 @@ class DeadLetterActorRef(val app: ActorSystem) extends MinimalActorRef { override def isShutdown(): Boolean = true - override def tell(msg: Any, sender: ActorRef): Unit = msg match { + override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match { case d: DeadLetter ⇒ app.eventStream.publish(d) - case _ ⇒ app.eventStream.publish(DeadLetter(msg, sender, this)) + case _ ⇒ app.eventStream.publish(DeadLetter(message, sender, this)) } override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = { @@ -417,7 +417,7 @@ abstract class AskActorRef(protected val app: ActorSystem)(timeout: Timeout = ap protected def whenDone(): Unit - override def tell(msg: Any, sender: ActorRef): Unit = msg match { + override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match { case Status.Success(r) ⇒ result.completeWithResult(r) case Status.Failure(f) ⇒ result.completeWithException(f) case other ⇒ result.completeWithResult(other) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 0bc355cf69..b8e4f39b1d 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -149,10 +149,10 @@ class LocalActorRefProvider(val app: ActorSystem) extends ActorRefProvider { def isShutdown = stopped - override def tell(msg: Any, sender: ActorRef): Unit = msg match { + override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match { case Failed(ex) ⇒ sender.stop() case ChildTerminated ⇒ terminationFuture.completeWithResult(ActorSystem.Stopped) - case _ ⇒ log.error(this + " received unexpected message " + msg) + case _ ⇒ log.error(this + " received unexpected message " + message) } protected[akka] override def sendSystemMessage(message: SystemMessage) { @@ -276,11 +276,12 @@ class LocalActorRefProvider(val app: ActorSystem) extends ActorRefProvider { private[akka] def createDeathWatch(): DeathWatch = new LocalDeathWatch private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = { - import akka.dispatch.{ Future, Promise, DefaultPromise } + import akka.dispatch.DefaultPromise (if (within == null) app.AkkaConfig.ActorTimeout else within) match { - case t if t.duration.length <= 0 ⇒ new DefaultPromise[Any](0)(app.dispatcher) //Abort early if nonsensical timeout + case t if t.duration.length <= 0 ⇒ + new DefaultPromise[Any](0)(app.dispatcher) //Abort early if nonsensical timeout case t ⇒ - val a = new AskActorRef(app)(timeout = t) { def whenDone() = actors.remove(this) } + val a = new AskActorRef(app)(timeout = t) { def whenDone() = actors.remove(this.path.toString) } assert(actors.putIfAbsent(a.path.toString, a) eq null) //If this fails, we're in deep trouble recipient.tell(message, a) a.result diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index f443be5a2f..abc3df50d2 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -334,9 +334,9 @@ class TypedActor(val app: ActorSystem) { try { if (m.isOneWay) m(me) else { - val s = sender try { if (m.returnsFuture_?) { + val s = sender m(me).asInstanceOf[Future[Any]] onComplete { _.value.get match { case Left(f) ⇒ s ! Status.Failure(f) @@ -344,10 +344,10 @@ class TypedActor(val app: ActorSystem) { } } } else { - s ! m(me) + sender ! m(me) } } catch { - case e: Exception ⇒ s ! Status.Failure(e) + case e: Exception ⇒ sender ! Status.Failure(e) } } } finally { diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 990f3832f1..b418ca7994 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -411,7 +411,7 @@ object Future { try { next.apply() } catch { - case e ⇒ // TODO FIXME: Throwable or Exception, log or do what? + case e ⇒ e.printStackTrace() //TODO FIXME strategy for handling exceptions in callbacks } } } finally { _taskStack set None } diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 966b7a534e..0f26437df3 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -369,7 +369,7 @@ object Logging { val path: ActorPath = null // pathless val address: String = name override val toString = "StandardOutLogger" - override def tell(obj: Any, sender: ActorRef) { print(obj) } + override def !(message: Any)(implicit sender: ActorRef = null): Unit = print(message) } val StandardOutLogger = new StandardOutLogger val StandardOutLoggerName = StandardOutLogger.getClass.getName diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 0fa194f0c0..88400e4ffa 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -158,7 +158,7 @@ object Routing { abstract private[akka] class AbstractRoutedActorRef(val app: ActorSystem, val props: RoutedProps) extends UnsupportedActorRef { val router = props.routerFactory() - override def tell(message: Any, sender: ActorRef) = router.route(message)(sender) + override def !(message: Any)(implicit sender: ActorRef = null): Unit = router.route(message)(sender) override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = router.route(message, timeout) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 1eeeb2ccf5..cb7261104f 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -271,9 +271,9 @@ private[akka] case class RemoteActorRef private[akka] ( protected[akka] def sendSystemMessage(message: SystemMessage): Unit = unsupported - def tell(message: Any, sender: ActorRef): Unit = remote.send(message, Option(sender), remoteAddress, this, loader) + override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), remoteAddress, this, loader) - def ?(message: Any)(implicit timeout: Timeout): Future[Any] = remote.app.provider.ask(message, this, timeout) + override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = remote.app.provider.ask(message, this, timeout) def suspend(): Unit = ()