diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java b/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java index d0388cb9c0..70f45e4c8f 100644 --- a/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java +++ b/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java @@ -27,7 +27,7 @@ public class JavaAPI { @Test void mustAcceptSingleArgTryTell() { ActorRef ref = app.actorOf(JavaAPITestActor.class); - ref.tryTell("hallo"); - ref.tryTell("hallo", ref); + ref.tell("hallo"); + ref.tell("hallo", ref); } } diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaAPITestActor.java b/akka-actor-tests/src/test/java/akka/actor/JavaAPITestActor.java index 4952d1b2c9..7b4c5a48bb 100644 --- a/akka-actor-tests/src/test/java/akka/actor/JavaAPITestActor.java +++ b/akka-actor-tests/src/test/java/akka/actor/JavaAPITestActor.java @@ -2,6 +2,6 @@ package akka.actor; public class JavaAPITestActor extends UntypedActor { public void onReceive(Object msg) { - getChannel().tryTell("got it!"); + getSender().tell("got it!"); } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala index 2a327a35d9..727d2bbbec 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala @@ -15,9 +15,9 @@ object ActorFireForgetRequestReplySpec { class ReplyActor extends Actor { def receive = { case "Send" ⇒ - channel ! "Reply" + sender ! "Reply" case "SendImplicit" ⇒ - channel ! "ReplyImplicit" + sender ! "ReplyImplicit" } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala index aca8ae829b..23373a8af6 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala @@ -26,7 +26,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS val currentGen = generationProvider.getAndIncrement() override def preStart() { report("preStart") } override def postStop() { report("postStop") } - def receive = { case "status" ⇒ channel ! message("OK") } + def receive = { case "status" ⇒ sender ! message("OK") } } "An Actor" must { diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 5fdf0487e5..d4ffc2a517 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -18,24 +18,24 @@ import java.util.concurrent.{ CountDownLatch, TimeUnit } object ActorRefSpec { - case class ReplyTo(channel: Channel[Any]) + case class ReplyTo(sender: ActorRef) val latch = TestLatch(4) class ReplyActor extends Actor { - var replyTo: Channel[Any] = null + var replyTo: ActorRef = null def receive = { case "complexRequest" ⇒ { - replyTo = channel + replyTo = sender val worker = context.actorOf(Props[WorkerActor]) worker ! "work" } case "complexRequest2" ⇒ val worker = context.actorOf(Props[WorkerActor]) - worker ! ReplyTo(channel) + worker ! ReplyTo(sender) case "workDone" ⇒ replyTo ! "complexReply" - case "simpleRequest" ⇒ channel ! "simpleReply" + case "simpleRequest" ⇒ sender ! "simpleReply" } } @@ -43,7 +43,7 @@ object ActorRefSpec { def receive = { case "work" ⇒ { work - channel ! "workDone" + sender ! "workDone" self.stop() } case ReplyTo(replyTo) ⇒ { @@ -74,7 +74,7 @@ object ActorRefSpec { class OuterActor(val inner: ActorRef) extends Actor { def receive = { - case "self" ⇒ channel ! self + case "self" ⇒ sender ! self case x ⇒ inner forward x } } @@ -83,7 +83,7 @@ object ActorRefSpec { val fail = new InnerActor def receive = { - case "self" ⇒ channel ! self + case "self" ⇒ sender ! self case x ⇒ inner forward x } } @@ -94,8 +94,8 @@ object ActorRefSpec { class InnerActor extends Actor { def receive = { - case "innerself" ⇒ channel ! self - case other ⇒ channel ! other + case "innerself" ⇒ sender ! self + case other ⇒ sender ! other } } @@ -103,8 +103,8 @@ object ActorRefSpec { val fail = new InnerActor def receive = { - case "innerself" ⇒ channel ! self - case other ⇒ channel ! other + case "innerself" ⇒ sender ! self + case other ⇒ sender ! other } } @@ -322,7 +322,7 @@ class ActorRefSpec extends AkkaSpec { "support nested actorOfs" in { val a = actorOf(new Actor { val nested = actorOf(new Actor { def receive = { case _ ⇒ } }) - def receive = { case _ ⇒ channel ! nested } + def receive = { case _ ⇒ sender ! nested } }) val nested = (a ? "any").as[ActorRef].get @@ -342,7 +342,7 @@ class ActorRefSpec extends AkkaSpec { (a ? "msg").as[String] must be === Some("msg") } - "support reply via channel" in { + "support reply via sender" in { val serverRef = actorOf(Props[ReplyActor]) val clientRef = actorOf(Props(new SenderActor(serverRef))) @@ -370,8 +370,8 @@ class ActorRefSpec extends AkkaSpec { val timeout = Timeout(20000) val ref = actorOf(Props(new Actor { def receive = { - case 5 ⇒ channel.tryTell("five") - case null ⇒ channel.tryTell("null") + case 5 ⇒ sender.tell("five") + case null ⇒ sender.tell("null") } })) diff --git a/akka-actor-tests/src/test/scala/akka/actor/Bench.scala b/akka-actor-tests/src/test/scala/akka/actor/Bench.scala index 492a3e6680..5b0edc835e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Bench.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Bench.scala @@ -101,7 +101,7 @@ object Chameneos { } } else { waitingChameneo.foreach(_ ! Exit) - channel ! Exit + sender ! Exit } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ChannelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ChannelSpec.scala deleted file mode 100644 index 8461b4f39c..0000000000 --- a/akka-actor-tests/src/test/scala/akka/actor/ChannelSpec.scala +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.actor - -import akka.dispatch._ -import akka.testkit.TestActorRef -import akka.testkit.AkkaSpec - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ChannelSpec extends AkkaSpec { - - "A Channel" must { - - "be contravariant" in { - val ap = new ActorPromise(1000) - val p: Promise[Any] = ap - val c: Channel[Any] = ap - val cs: Channel[String] = c - } - - "find implicit sender actors" in { - var s: (String, UntypedChannel) = null - val ch = new Channel[String] { - def !(msg: String)(implicit sender: UntypedChannel) = { s = (msg, sender) } - } - val a = TestActorRef(new Actor { - def receive = { - case str: String ⇒ ch ! str - } - }) - a ! "hallo" - s must be(("hallo", a)) - - { - implicit val actor = a - ch tryTell "buh" - } - s must be(("buh", a)) - ch.!("world")(a) - s must be(("world", a)) - ch.tryTell("bippy")(a) - s must be(("bippy", a)) - } - - } - -} diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 485de60d42..db51993b34 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -82,7 +82,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende "notify with a Terminated message once when an Actor is stopped but not when restarted" in { filterException[ActorKilledException] { val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(2)))) - val terminalProps = Props(context ⇒ { case x ⇒ context.channel ! x }) + val terminalProps = Props(context ⇒ { case x ⇒ context.sender ! x }) val terminal = (supervisor ? terminalProps).as[ActorRef].get val monitor = actorOf(Props(new Actor { diff --git a/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala index 1aff230560..1390cbf965 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala @@ -15,7 +15,7 @@ object ForwardActorSpec { def createForwardingChain(app: AkkaApplication): ActorRef = { val replier = app.actorOf(new Actor { - def receive = { case x ⇒ channel ! x } + def receive = { case x ⇒ sender ! x } }) def mkforwarder(forwardTo: ActorRef) = app.actorOf( diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala index 107df964ae..d4f08e40c2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala @@ -48,7 +48,7 @@ object IOActorSpec { def receiveIO = { case length: Int ⇒ val bytes = socket.read(length) - channel ! bytes + sender ! bytes } } } @@ -108,9 +108,9 @@ object IOActorSpec { case msg: NewClient ⇒ createWorker forward msg case ('set, key: String, value: ByteString) ⇒ kvs += (key -> value) - channel.tryTell(())(self) - case ('get, key: String) ⇒ channel.tryTell(kvs.get(key))(self) - case 'getall ⇒ channel.tryTell(kvs)(self) + sender.tell((), self) + case ('get, key: String) ⇒ sender.tell(kvs.get(key), self) + case 'getall ⇒ sender.tell(kvs, self) } } @@ -123,7 +123,7 @@ object IOActorSpec { socket = connect(ioManager, host, port) } - def reply(msg: Any) = channel.tryTell(msg)(self) + def reply(msg: Any) = sender.tell(msg, self) def receiveIO = { case ('set, key: String, value: ByteString) ⇒ diff --git a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala index 5ea68924d1..170183d2c8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala @@ -62,7 +62,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd app.eventHandler.addListener(testActor) val actor = TestActorRef(new Actor { def receive = loggable(this) { - case _ ⇒ channel ! "x" + case _ ⇒ sender ! "x" } }) actor ! "buh" @@ -91,7 +91,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd app.eventHandler.addListener(testActor) val actor = TestActorRef(new Actor { def receive = loggable(this)(loggable(this) { - case _ ⇒ channel ! "x" + case _ ⇒ sender ! "x" }) }) actor ! "buh" diff --git a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala index 1fa42ac61b..38a223d29a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala @@ -210,7 +210,7 @@ class RestartStrategySpec extends AkkaSpec { val boss = actorOf(Props(new Actor { def receive = { - case p: Props ⇒ channel ! context.actorOf(p) + case p: Props ⇒ sender ! context.actorOf(p) case t: Terminated ⇒ maxNoOfRestartsLatch.open } }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, Some(1000)))) diff --git a/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala b/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala index 14c70933f7..6c438f1776 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala @@ -5,6 +5,6 @@ package akka.actor class Supervisor extends Actor { def receive = { - case x: Props ⇒ channel ! context.actorOf(x) + case x: Props ⇒ sender ! context.actorOf(x) } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index 860478f862..567acfd6f8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -13,7 +13,7 @@ object SupervisorHierarchySpec { class CountDownActor(countDown: CountDownLatch) extends Actor { protected def receive = { - case p: Props ⇒ channel ! context.actorOf(p) + case p: Props ⇒ sender ! context.actorOf(p) } override def postRestart(reason: Throwable) = { countDown.countDown() diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala index e33b7ab878..99068ed76e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala @@ -22,7 +22,7 @@ class SupervisorMiscSpec extends AkkaSpec { val workerProps = Props(new Actor { override def postRestart(cause: Throwable) { countDownLatch.countDown() } protected def receive = { - case "status" ⇒ this.channel ! "OK" + case "status" ⇒ this.sender ! "OK" case _ ⇒ this.self.stop() } }) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index 20b1f92aea..c38290eeec 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -39,7 +39,7 @@ object SupervisorSpec { def receive = { case Ping ⇒ messageLog.put(PingMessage) - channel.tryTell(PongMessage) + sender.tell(PongMessage) case Die ⇒ throw new RuntimeException(ExceptionMessage) } @@ -53,10 +53,10 @@ object SupervisorSpec { val temp = context.actorOf(Props[PingPongActor]) self startsMonitoring temp - var s: UntypedChannel = _ + var s: ActorRef = _ def receive = { - case Die ⇒ temp ! Die; s = context.channel + case Die ⇒ temp ! Die; s = sender case Terminated(`temp`) ⇒ s ! "terminated" } } @@ -294,7 +294,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende if (inits.get % 2 == 0) throw new IllegalStateException("Don't wanna!") def receive = { - case Ping ⇒ channel.tryTell(PongMessage) + case Ping ⇒ sender.tell(PongMessage) case Die ⇒ throw new RuntimeException("Expected") } }) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala index e5b6283c36..f2c3d36081 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala @@ -23,7 +23,7 @@ class SupervisorTreeSpec extends AkkaSpec with ImplicitSender { within(5 seconds) { val p = Props(new Actor { def receive = { - case p: Props ⇒ channel ! context.actorOf(p) + case p: Props ⇒ sender ! context.actorOf(p) } override def preRestart(cause: Throwable, msg: Option[Any]) { testActor ! self.address } }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, 1000)) diff --git a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala index c864af36fa..758f44848b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala @@ -25,7 +25,7 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender val supervisor = actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 5, 10000))) val supervised = (supervisor ? Props[Supervised]).as[ActorRef].get - supervised.!("test")(Some(testActor)) + supervised.!("test")(testActor) expectMsg("failure1") supervisor.stop() } @@ -36,7 +36,7 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender val supervisor = actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), Some(0), None))) val supervised = (supervisor ? Props[Supervised]).as[ActorRef].get - supervised.!("test")(Some(testActor)) + supervised.!("test")(testActor) expectMsg("failure2") supervisor.stop() } @@ -51,11 +51,11 @@ object Ticket669Spec { } override def preRestart(reason: scala.Throwable, msg: Option[Any]) { - channel.tryTell("failure1") + sender.tell("failure1") } override def postStop() { - channel.tryTell("failure2") + sender.tell("failure2") } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 4727825b9f..2af08fbe6f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -263,7 +263,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte "be able to handle exceptions when calling methods" in { filterEvents(EventFilter[IllegalStateException]("expected")) { val boss = actorOf(Props(context ⇒ { - case p: Props ⇒ context.channel ! context.typedActorOf(classOf[Foo], classOf[Bar], p) + case p: Props ⇒ context.sender ! context.typedActorOf(classOf[Foo], classOf[Bar], p) }).withFaultHandler(OneForOneStrategy { case e: IllegalStateException if e.getMessage == "expected" ⇒ FaultHandlingStrategy.Resume })) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 318120a2a4..50b1b69838 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -71,8 +71,8 @@ object ActorModelSpec { case Meet(sign, wait) ⇒ ack; sign.countDown(); wait.await(); busy.switchOff() case Wait(time) ⇒ ack; Thread.sleep(time); busy.switchOff() case WaitAck(time, l) ⇒ ack; Thread.sleep(time); l.countDown(); busy.switchOff() - case Reply(msg) ⇒ ack; channel ! msg; busy.switchOff() - case TryReply(msg) ⇒ ack; channel.tryTell(msg); busy.switchOff() + case Reply(msg) ⇒ ack; sender ! msg; busy.switchOff() + case TryReply(msg) ⇒ ack; sender.tell(msg); busy.switchOff() case Forward(to, msg) ⇒ ack; to.forward(msg); busy.switchOff() case CountDown(latch) ⇒ ack; latch.countDown(); busy.switchOff() case Increment(count) ⇒ ack; count.incrementAndGet(); busy.switchOff() diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala index 02fa4b0689..2ce2171438 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala @@ -9,7 +9,7 @@ import akka.actor.{ Props, Actor } object DispatcherActorSpec { class TestActor extends Actor { def receive = { - case "Hello" ⇒ channel ! "World" + case "Hello" ⇒ sender ! "World" case "Failure" ⇒ throw new RuntimeException("Expected exception; to test fault-tolerance") } } @@ -46,20 +46,6 @@ class DispatcherActorSpec extends AkkaSpec { actor.stop() } - "support ask/exception" in { - filterEvents(EventFilter[RuntimeException]("Expected")) { - val actor = actorOf(Props[TestActor].withDispatcher(app.dispatcherFactory.newDispatcher("test").build)) - try { - (actor ? "Failure").get - fail("Should have thrown an exception") - } catch { - case e ⇒ - assert("Expected exception; to test fault-tolerance" === e.getMessage()) - } - actor.stop() - } - } - "respect the throughput setting" in { val throughputDispatcher = app.dispatcherFactory. newDispatcher("THROUGHPUT", 101, 0, app.dispatcherFactory.MailboxType). @@ -74,7 +60,7 @@ class DispatcherActorSpec extends AkkaSpec { val slowOne = actorOf( Props(context ⇒ { - case "hogexecutor" ⇒ context.channel ! "OK"; start.await + case "hogexecutor" ⇒ context.sender ! "OK"; start.await case "ping" ⇒ if (works.get) latch.countDown() }).withDispatcher(throughputDispatcher)) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala index a2f0a785de..fc0e240a50 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala @@ -12,7 +12,7 @@ import org.scalatest.BeforeAndAfterEach object PinnedActorSpec { class TestActor extends Actor { def receive = { - case "Hello" ⇒ channel ! "World" + case "Hello" ⇒ sender ! "World" case "Failure" ⇒ throw new RuntimeException("Expected exception; to test fault-tolerance") } } @@ -48,18 +48,5 @@ class PinnedActorSpec extends AkkaSpec with BeforeAndAfterEach { assert("World" === result.get) actor.stop() } - - "support ask/exception" in { - val actor = actorOf(Props[TestActor].withDispatcher(app.dispatcherFactory.newPinnedDispatcher("test"))) - app.eventHandler.notify(Mute(EventFilter[RuntimeException]("Expected exception; to test fault-tolerance"))) - try { - (actor ? "Failure").get - fail("Should have thrown an exception") - } catch { - case e ⇒ - assert("Expected exception; to test fault-tolerance" === e.getMessage()) - } - actor.stop() - } } } diff --git a/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala b/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala index 948af64106..227a29ddc2 100644 --- a/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala +++ b/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala @@ -21,8 +21,8 @@ class Future2ActorSpec extends AkkaSpec { "support reply via channel" in { val actor = app.actorOf(Props(new Actor { def receive = { - case "do" ⇒ Future(31) pipeTo context.channel - case "ex" ⇒ Future(throw new AssertionError) pipeTo context.channel + case "do" ⇒ Future(31) pipeTo context.sender + case "ex" ⇒ Future(throw new AssertionError) pipeTo context.sender } })) (actor ? "do").as[Int] must be(Some(31)) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 3b6b147ec5..b2505f4a41 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -17,7 +17,7 @@ import org.scalatest.junit.JUnitSuite object FutureSpec { class TestActor extends Actor { def receive = { - case "Hello" ⇒ channel ! "World" + case "Hello" ⇒ sender ! "World" case "Failure" ⇒ throw new RuntimeException("Expected exception; to test fault-tolerance") case "NoReply" ⇒ } @@ -25,7 +25,7 @@ object FutureSpec { class TestDelayActor(await: StandardLatch) extends Actor { def receive = { - case "Hello" ⇒ await.await; channel ! "World" + case "Hello" ⇒ await.await; sender ! "World" case "NoReply" ⇒ await.await case "Failure" ⇒ await.await @@ -137,7 +137,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll { "will return a result" must { behave like futureWithResult { test ⇒ val actor1 = actorOf[TestActor] - val actor2 = actorOf(new Actor { def receive = { case s: String ⇒ channel ! s.toUpperCase } }) + val actor2 = actorOf(new Actor { def receive = { case s: String ⇒ sender ! s.toUpperCase } }) val future = actor1 ? "Hello" flatMap { case s: String ⇒ actor2 ? s } future.await test(future, "WORLD") @@ -149,7 +149,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll { behave like futureWithException[ArithmeticException] { test ⇒ filterException[ArithmeticException] { val actor1 = actorOf[TestActor] - val actor2 = actorOf(new Actor { def receive = { case s: String ⇒ channel ! s.length / 0 } }) + val actor2 = actorOf(new Actor { def receive = { case s: String ⇒ sender ! s.length / 0 } }) val future = actor1 ? "Hello" flatMap { case s: String ⇒ actor2 ? s } future.await test(future, "/ by zero") @@ -162,7 +162,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll { behave like futureWithException[MatchError] { test ⇒ filterException[MatchError] { val actor1 = actorOf[TestActor] - val actor2 = actorOf(new Actor { def receive = { case s: String ⇒ channel ! s.toUpperCase } }) + val actor2 = actorOf(new Actor { def receive = { case s: String ⇒ sender ! s.toUpperCase } }) val future = actor1 ? "Hello" flatMap { case i: Int ⇒ actor2 ? i } future.await test(future, "World (of class java.lang.String)") @@ -179,8 +179,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll { filterException[ClassCastException] { val actor = actorOf(new Actor { def receive = { - case s: String ⇒ channel ! s.length - case i: Int ⇒ channel ! (i * 2).toString + case s: String ⇒ sender ! s.length + case i: Int ⇒ sender ! (i * 2).toString } }) @@ -211,8 +211,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll { case class Res[T](res: T) val actor = actorOf(new Actor { def receive = { - case Req(s: String) ⇒ channel ! Res(s.length) - case Req(i: Int) ⇒ channel ! Res((i * 2).toString) + case Req(s: String) ⇒ sender ! Res(s.length) + case Req(i: Int) ⇒ sender ! Res((i * 2).toString) } }) @@ -298,7 +298,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll { "fold" in { val actors = (1 to 10).toList map { _ ⇒ actorOf(new Actor { - def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); channel.tryTell(add) } + def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); sender.tell(add) } }) } val timeout = 10000 @@ -309,7 +309,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll { "fold by composing" in { val actors = (1 to 10).toList map { _ ⇒ actorOf(new Actor { - def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); channel.tryTell(add) } + def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); sender.tell(add) } }) } def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), 10000).mapTo[Int] } @@ -324,7 +324,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll { case (add: Int, wait: Int) ⇒ Thread.sleep(wait) if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected") - channel.tryTell(add) + sender.tell(add) } }) } @@ -356,7 +356,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll { "shouldReduceResults" in { val actors = (1 to 10).toList map { _ ⇒ actorOf(new Actor { - def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); channel.tryTell(add) } + def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); sender.tell(add) } }) } val timeout = 10000 @@ -372,7 +372,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll { case (add: Int, wait: Int) ⇒ Thread.sleep(wait) if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected") - channel.tryTell(add) + sender.tell(add) } }) } @@ -401,7 +401,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll { var counter = 1 def receive = { case 'GetNext ⇒ - channel ! counter + sender ! counter counter += 2 } }) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index a19898a502..8e1bc597d9 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -8,7 +8,7 @@ import java.util.concurrent.{ TimeUnit, CountDownLatch, BlockingQueue } import java.util.{ Queue } import akka.util._ import akka.util.Duration._ -import akka.actor.{ LocalActorRef, Actor, NullChannel } +import akka.actor.{ LocalActorRef, Actor } import akka.testkit.AkkaSpec @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -80,7 +80,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn result } - def createMessageInvocation(msg: Any): Envelope = Envelope(msg, NullChannel) + def createMessageInvocation(msg: Any): Envelope = Envelope(msg, app.deadLetters) def ensureInitialMailboxState(config: MailboxType, q: Mailbox) { q must not be null diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala index 0c599937d2..ebc42c92d9 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala @@ -30,7 +30,7 @@ class PriorityDispatcherSpec extends AkkaSpec { def receive = { case i: Int ⇒ acc = i :: acc - case 'Result ⇒ channel.tryTell(acc) + case 'Result ⇒ sender.tell(acc) } }).withDispatcher(dispatcher)).asInstanceOf[LocalActorRef] diff --git a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala index 4861dd9ea5..4c43e17d83 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala @@ -61,13 +61,13 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte disposeSubscriber(sub) } - "not allow for the same subscriber to subscribe to the same channel twice" in { + "not allow for the same subscriber to subscribe to the same sender twice" in { bus.subscribe(subscriber, classifier) must be === true bus.subscribe(subscriber, classifier) must be === false bus.unsubscribe(subscriber, classifier) must be === true } - "not allow for the same subscriber to unsubscribe to the same channel twice" in { + "not allow for the same subscriber to unsubscribe to the same sender twice" in { bus.subscribe(subscriber, classifier) must be === true bus.unsubscribe(subscriber, classifier) must be === true bus.unsubscribe(subscriber, classifier) must be === false diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/MatchingEngine.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/MatchingEngine.scala index 5f2989fa97..9c88e67902 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/MatchingEngine.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/MatchingEngine.scala @@ -47,7 +47,7 @@ class AkkaMatchingEngine(val meId: String, val orderbooks: List[Orderbook]) } def done(status: Boolean) { - channel ! new Rsp(status) + sender ! new Rsp(status) } def waitForStandby(pendingStandbyFuture: Future[_]) { diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala index c1f811b425..114fe7e349 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala @@ -41,7 +41,7 @@ class AkkaOrderReceiver extends Actor with OrderReceiver { m.forward(order) case None ⇒ app.eventHandler.warning(this, "Unknown orderbook: " + order.orderbookSymbol) - channel ! new Rsp(false) + sender ! new Rsp(false) } } } diff --git a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala index ad48435996..01fd35ba42 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala @@ -42,7 +42,7 @@ class ActorPoolSpec extends AkkaSpec { case _ ⇒ count.incrementAndGet latch.countDown() - channel.tryTell("success") + sender.tell("success") } })) @@ -89,7 +89,7 @@ class ActorPoolSpec extends AkkaSpec { def receive = { case req: String ⇒ { sleepFor(10 millis) - channel.tryTell("Response") + sender.tell("Response") } } })) @@ -112,7 +112,7 @@ class ActorPoolSpec extends AkkaSpec { val count = new AtomicInteger(0) val pool = actorOf( - Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter { + Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveActorsPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter { def instance(p: Props) = actorOf(p.withCreator(new Actor { def receive = { case n: Int ⇒ @@ -359,7 +359,7 @@ class ActorPoolSpec extends AkkaSpec { val keepDying = new AtomicBoolean(false) val pool1, pool2 = actorOf( - Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter { + Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveActorsPressureCapacitor with SmallestMailboxSelector with BasicFilter { def lowerBound = 2 def upperBound = 5 def rampupRate = 0.1 @@ -382,7 +382,7 @@ class ActorPoolSpec extends AkkaSpec { }).withFaultHandler(faultHandler)) val pool3 = actorOf( - Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter { + Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveActorsPressureCapacitor with RoundRobinSelector with BasicFilter { def lowerBound = 2 def upperBound = 5 def rampupRate = 0.1 @@ -480,7 +480,7 @@ class ActorPoolSpec extends AkkaSpec { object BadState val pool1 = actorOf( - Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter { + Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveActorsPressureCapacitor with SmallestMailboxSelector with BasicFilter { def lowerBound = 2 def upperBound = 5 def rampupRate = 0.1 diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala index 0650aec50e..6d1d5e1da0 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala @@ -74,7 +74,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec { val actor = app.actorOf(Props(new Actor { lazy val id = counter.getAndIncrement() def receive = { - case "hit" ⇒ channel ! id + case "hit" ⇒ sender ! id case "end" ⇒ doneLatch.countDown() } }), address) @@ -188,7 +188,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec { val actor = app.actorOf(Props(new Actor { lazy val id = counter.getAndIncrement() def receive = { - case "hit" ⇒ channel ! id + case "hit" ⇒ sender ! id case "end" ⇒ doneLatch.countDown() } }), address) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 1313694260..510b6ecbef 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -459,7 +459,7 @@ class RoutingSpec extends AkkaSpec { case Stop(None) ⇒ self.stop() case Stop(Some(_id)) if (_id == id) ⇒ self.stop() case _id: Int if (_id == id) ⇒ - case _ ⇒ Thread sleep 100 * id; channel.tryTell(id) + case _ ⇒ Thread sleep 100 * id; sender.tell(id) } override def postStop = { diff --git a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala index 1b54c709d8..1e1768019e 100644 --- a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala @@ -22,7 +22,7 @@ class Ticket703Spec extends AkkaSpec { def receive = { case req: String ⇒ Thread.sleep(6000L) - channel.tryTell("Response") + sender.tell("Response") } })) }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 5, 1000))) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index a6cb36443d..2cce44bec8 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -209,16 +209,15 @@ trait Actor { * Stores the context for this actor, including self, sender, and hotswap. */ @transient - private[akka] val context: ActorContext = { + private[akka] implicit val context: ActorContext = { val contextStack = ActorCell.contextStack.get - def noContextError = { + def noContextError = throw new ActorInitializationException( "\n\tYou cannot create an instance of " + getClass.getName + " explicitly using the constructor (new)." + "\n\tYou have to use one of the factory methods to create a new actor. Either use:" + "\n\t\t'val actor = Actor.actorOf[MyActor]', or" + "\n\t\t'val actor = Actor.actorOf(new MyActor(..))'") - } if (contextStack.isEmpty) noContextError val context = contextStack.head @@ -283,14 +282,6 @@ trait Actor { @inline final def sender: ActorRef = context.sender - /** - * Abstraction for unification of sender and senderFuture for later reply - */ - def channel: UntypedChannel = context.channel - - // TODO FIXME REMOVE ME just for current compatibility - implicit def forwardable: ForwardableChannel = ForwardableChannel(channel) - /** * Gets the current receive timeout * When specified, the receive method should be able to handle a 'ReceiveTimeout' message. @@ -326,7 +317,7 @@ trait Actor { * def receive = { * case Ping => * println("got a 'Ping' message") - * channel ! "pong" + * sender ! "pong" * * case OneWay => * println("got a 'OneWay' message") @@ -427,10 +418,7 @@ trait Actor { case f: Failed ⇒ context.handleFailure(f) case ct: ChildTerminated ⇒ context.handleChildTerminated(ct.child) case Kill ⇒ throw new ActorKilledException("Kill") - case PoisonPill ⇒ - val ch = channel - self.stop() - ch.sendException(new ActorKilledException("PoisonPill")) + case PoisonPill ⇒ self.stop() } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 75c7d1f02e..6b1e8f2504 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -36,8 +36,6 @@ private[akka] trait ActorContext extends ActorRefFactory with TypedActorFactory def sender: ActorRef - def channel: UntypedChannel - def children: Iterable[ActorRef] def dispatcher: MessageDispatcher @@ -126,29 +124,12 @@ private[akka] class ActorCell( def children: Iterable[ActorRef] = _children.keys - def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = dispatcher.dispatch(this, Envelope(message, channel)) - - def postMessageToMailboxAndCreateFutureResultWithTimeout( - message: Any, - timeout: Timeout, - channel: UntypedChannel): Future[Any] = { - val future = channel match { - case f: ActorPromise ⇒ f - case _ ⇒ new ActorPromise(timeout)(dispatcher) - } - dispatcher.dispatch(this, Envelope(message, future)) - future - } + def postMessageToMailbox(message: Any, sender: ActorRef): Unit = dispatcher.dispatch(this, Envelope(message, sender)) def sender: ActorRef = currentMessage match { - case null ⇒ app.deadLetters - case msg if msg.channel.isInstanceOf[ActorRef] ⇒ msg.channel.asInstanceOf[ActorRef] - case _ ⇒ app.deadLetters - } - - def channel: UntypedChannel = currentMessage match { - case null ⇒ NullChannel - case msg ⇒ msg.channel + case null ⇒ app.deadLetters + case msg if msg.sender ne null ⇒ msg.sender + case _ ⇒ app.deadLetters } //This method is in charge of setting up the contextStack and create a new instance of the Actor @@ -308,12 +289,10 @@ private[akka] class ActorCell( // make sure that InterruptedException does not leave this thread if (e.isInstanceOf[InterruptedException]) { val ex = ActorInterruptedException(e) - channel.sendException(ex) props.faultHandler.handleSupervisorFailing(self, children) supervisor ! Failed(self, ex) throw e //Re-throw InterruptedExceptions as expected } else { - channel.sendException(e) props.faultHandler.handleSupervisorFailing(self, children) supervisor ! Failed(self, e) } @@ -325,9 +304,6 @@ private[akka] class ActorCell( app.eventHandler.error(e, self, e.getMessage) throw e } - } else { - messageHandle.channel sendException new ActorKilledException("Actor has been stopped") - // throwing away message if actor is shut down, no use throwing an exception in receiving actor's thread, isShutdown is enforced on caller side } } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 43fb856afb..3c9d2144ec 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -41,7 +41,7 @@ import akka.event.ActorEventBus * * @author Jonas Bonér */ -abstract class ActorRef extends UntypedChannel with ReplyChannel[Any] with java.lang.Comparable[ActorRef] with Serializable { +abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable { scalaRef: ScalaActorRef ⇒ // Only mutable for RemoteServer in order to maintain identity across nodes @@ -58,11 +58,23 @@ abstract class ActorRef extends UntypedChannel with ReplyChannel[Any] with java. def compareTo(other: ActorRef) = this.address compareTo other.address /** - * Akka Java API.

- * @see ask(message: AnyRef, sender: ActorRef): Future[_] - * Uses the specified timeout (milliseconds) + * Sends the specified message to the sender, i.e. fire-and-forget semantics.

+ *

+   * actor.tell(message);
+   * 
*/ - def ask(message: AnyRef, timeout: Long): Future[Any] = ask(message, timeout, null) + def tell(msg: Any): Unit = this.!(msg) + + /** + * Java API.

+ * Sends the specified message to the sender, i.e. fire-and-forget + * semantics, including the sender reference if possible (not supported on + * all senders).

+ *

+   * actor.tell(message, context);
+   * 
+ */ + def tell(msg: Any, sender: ActorRef): Unit = this.!(msg)(sender) /** * Akka Java API.

@@ -72,20 +84,17 @@ abstract class ActorRef extends UntypedChannel with ReplyChannel[Any] with java. * Use this method with care. In most cases it is better to use 'tell' together with the 'getContext().getSender()' to * implement request/response message exchanges. *

- * If you are sending messages using ask then you have to use getContext().channel().tell(...) + * If you are sending messages using ask then you have to use getContext().sender().tell(...) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def ask(message: AnyRef, timeout: Long, sender: ActorRef): Future[AnyRef] = - ?(message, Timeout(timeout))(sender).asInstanceOf[Future[AnyRef]] + def ask(message: AnyRef, timeout: Long): Future[AnyRef] = ?(message, Timeout(timeout)).asInstanceOf[Future[AnyRef]] /** - * Akka Java API.

- * Forwards the message specified to this actor and preserves the original sender of the message + * Forwards the message and passes the original sender actor as the sender. + *

+ * Works with '!' and '?'/'ask'. */ - def forward(message: AnyRef, sender: ActorRef) { - if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null") - else forward(message)(ForwardableChannel(sender)) - } + def forward(message: Any)(implicit context: ActorContext) = postMessageToMailbox(message, context.sender) /** * Suspends the actor. It will not process messages while suspended. @@ -222,15 +231,9 @@ class LocalActorRef private[akka] ( protected[akka] def sendSystemMessage(message: SystemMessage) { underlying.dispatcher.systemDispatch(underlying, message) } - protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = - actorCell.postMessageToMailbox(message, channel) + protected[akka] def postMessageToMailbox(message: Any, sender: ActorRef): Unit = actorCell.postMessageToMailbox(message, sender) - protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout( - message: Any, - timeout: Timeout, - channel: UntypedChannel): Future[Any] = { - actorCell.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, channel) - } + def ?(message: Any)(implicit timeout: Timeout): Future[Any] = app.provider.ask(message, this, timeout) protected[akka] def handleFailure(fail: Failed): Unit = actorCell.handleFailure(fail) @@ -251,7 +254,7 @@ class LocalActorRef private[akka] ( * There are implicit conversions in ../actor/Implicits.scala * from ActorRef -> ScalaActorRef and back */ -trait ScalaActorRef extends ReplyChannel[Any] { ref: ActorRef ⇒ +trait ScalaActorRef { ref: ActorRef ⇒ protected[akka] def sendSystemMessage(message: SystemMessage): Unit @@ -269,28 +272,16 @@ trait ScalaActorRef extends ReplyChannel[Any] { ref: ActorRef ⇒ * *

*/ - def !(message: Any)(implicit channel: UntypedChannel): Unit = postMessageToMailbox(message, channel) + def !(message: Any)(implicit sender: ActorRef = null): Unit = postMessageToMailbox(message, sender) /** * Sends a message asynchronously, returning a future which may eventually hold the reply. */ - def ?(message: Any)(implicit channel: UntypedChannel, timeout: Timeout): Future[Any] = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, channel) + def ?(message: Any)(implicit timeout: Timeout): Future[Any] - def ?(message: Any, timeout: Timeout)(implicit channel: UntypedChannel): Future[Any] = ?(message)(channel, timeout) + def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout) - /** - * Forwards the message and passes the original sender actor as the sender. - *

- * Works with '!' and '?'/'ask'. - */ - def forward(message: Any)(implicit forwardable: ForwardableChannel) = postMessageToMailbox(message, forwardable.channel) - - protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit - - protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout( - message: Any, - timeout: Timeout, - channel: UntypedChannel): Future[Any] + protected[akka] def postMessageToMailbox(message: Any, sender: ActorRef): Unit protected[akka] def restart(cause: Throwable): Unit } @@ -320,31 +311,24 @@ case class SerializedActorRef(uuid: Uuid, address: String, hostname: String, por */ trait UnsupportedActorRef extends ActorRef with ScalaActorRef { - private[akka] def uuid: Uuid = unsupported + private[akka] final val uuid: akka.actor.Uuid = newUuid() - def startsMonitoring(actorRef: ActorRef): ActorRef = unsupported + def startsMonitoring(actorRef: ActorRef): ActorRef = actorRef - def stopsMonitoring(actorRef: ActorRef): ActorRef = unsupported + def stopsMonitoring(actorRef: ActorRef): ActorRef = actorRef - def suspend(): Unit = unsupported + def suspend(): Unit = () - def resume(): Unit = unsupported + def resume(): Unit = () - protected[akka] def restart(cause: Throwable): Unit = unsupported + protected[akka] def restart(cause: Throwable): Unit = () - def stop(): Unit = unsupported + protected[akka] def sendSystemMessage(message: SystemMessage): Unit = () - def address: String = unsupported + protected[akka] def postMessageToMailbox(msg: Any, sender: ActorRef): Unit = () - def isShutdown = false - - protected[akka] def sendSystemMessage(message: SystemMessage) {} - - protected[akka] def postMessageToMailbox(msg: Any, channel: UntypedChannel) {} - - protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(msg: Any, timeout: Timeout, channel: UntypedChannel): Future[Any] = unsupported - - private def unsupported = throw new UnsupportedOperationException("Not supported for %s".format(getClass.getName)) + def ?(message: Any)(implicit timeout: Timeout): Future[Any] = + throw new UnsupportedOperationException("Not supported for %s".format(getClass.getName)) } /** @@ -352,7 +336,7 @@ trait UnsupportedActorRef extends ActorRef with ScalaActorRef { */ trait MinimalActorRef extends ActorRef with ScalaActorRef { - private[akka] val uuid: Uuid = new com.eaio.uuid.UUID(0L, 0L) //Nil UUID + private[akka] val uuid: Uuid = newUuid() def address = uuid.toString def startsMonitoring(actorRef: ActorRef): ActorRef = actorRef @@ -368,14 +352,10 @@ trait MinimalActorRef extends ActorRef with ScalaActorRef { protected[akka] def sendSystemMessage(message: SystemMessage) {} - protected[akka] def postMessageToMailbox(msg: Any, channel: UntypedChannel) {} - - protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(msg: Any, timeout: Timeout, channel: UntypedChannel): Future[Any] = unsupported - - private def unsupported = throw new UnsupportedOperationException("Not supported for %s".format(getClass.getName)) + protected[akka] def postMessageToMailbox(msg: Any, sender: ActorRef) {} } -case class DeadLetter(message: Any, channel: UntypedChannel) +case class DeadLetter(message: Any, sender: ActorRef) class DeadLetterActorRef(app: AkkaApplication) extends MinimalActorRef { val brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef, promises are always broken.")))(app.dispatcher) @@ -383,12 +363,9 @@ class DeadLetterActorRef(app: AkkaApplication) extends MinimalActorRef { override def isShutdown(): Boolean = true - protected[akka] override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = app.eventHandler.notify(DeadLetter(message, channel)) + protected[akka] override def postMessageToMailbox(message: Any, sender: ActorRef): Unit = app.eventHandler.notify(DeadLetter(message, sender)) - protected[akka] override def postMessageToMailboxAndCreateFutureResultWithTimeout( - message: Any, - timeout: Timeout, - channel: UntypedChannel): Future[Any] = { app.eventHandler.notify(DeadLetter(message, channel)); brokenPromise } + def ?(message: Any)(implicit timeout: Timeout): Future[Any] = brokenPromise } abstract class AskActorRef(promise: Promise[Any], app: AkkaApplication) extends MinimalActorRef { @@ -398,22 +375,16 @@ abstract class AskActorRef(promise: Promise[Any], app: AkkaApplication) extends protected def whenDone(): Unit - override protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = message match { + override protected[akka] def postMessageToMailbox(message: Any, sender: ActorRef): Unit = message match { case akka.actor.Status.Success(r) ⇒ promise.completeWithResult(r) case akka.actor.Status.Failure(f) ⇒ promise.completeWithException(f) case other ⇒ promise.completeWithResult(other) } - override protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout( - message: Any, - timeout: Timeout, - channel: UntypedChannel): Future[Any] = { - postMessageToMailbox(message, channel) - promise - } + def ?(message: Any)(implicit timeout: Timeout): Future[Any] = + new KeptPromise[Any](Left(new ActorKilledException("Not possible to ask/? a reference to an ask/?.")))(app.dispatcher) override def isShutdown = promise.isCompleted || promise.isExpired override def stop(): Unit = if (!isShutdown) promise.completeWithException(new ActorKilledException("Stopped")) - } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index aa84ef2711..e823b7d1f6 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -97,11 +97,18 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { * receive only Supervise/ChildTerminated system messages or Failure message. */ private[akka] val theOneWhoWalksTheBubblesOfSpaceTime: ActorRef = new UnsupportedActorRef { + @volatile + var stopped = false + override def address = app.name + ":BubbleWalker" override def toString = address - protected[akka] override def postMessageToMailbox(msg: Any, channel: UntypedChannel) { + def stop() = stopped = true + + def isShutdown = stopped + + protected[akka] override def postMessageToMailbox(msg: Any, sender: ActorRef) { msg match { case Failed(child, ex) ⇒ child.stop() case ChildTerminated(child) ⇒ terminationFuture.completeWithResult(AkkaApplication.Stopped) @@ -205,7 +212,7 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { // val localOnly = props.localOnly // if (clusteringEnabled && !props.localOnly) ReflectiveAccess.ClusterModule.newClusteredActorRef(props) // else new RoutedActorRef(props, address) - new RoutedActorRef(props, address) + new RoutedActorRef(app, props, address) } private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = actorFor(actor.address) diff --git a/akka-actor/src/main/scala/akka/actor/Channel.scala b/akka-actor/src/main/scala/akka/actor/Channel.scala deleted file mode 100644 index ce2beb224b..0000000000 --- a/akka-actor/src/main/scala/akka/actor/Channel.scala +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.actor - -/* - * This package is just used to hide the tryTell method from the Scala parts: - * it will be public to javac's eyes by virtue of §5.2 of the SLS. - */ -package japi { - trait Channel[-T] { self: akka.actor.Channel[T] ⇒ - private[japi] def tryTell(msg: T): Boolean = { - try { - self.!(msg)(NullChannel) - true - } catch { - case _: Exception ⇒ false - } - } - } -} - -/** - * Abstraction for unification of sender and senderFuture for later reply. - * Can be stored away and used at a later point in time. - * - * The possible reply channel which can be passed into ! and tryTell is always - * untyped, as there is no way to utilize its real static type without - * requiring runtime-costly manifests. - */ -trait Channel[-T] extends japi.Channel[T] { - - /** - * Scala API.

- * Sends the specified message to the channel. - */ - def !(msg: T)(implicit sender: UntypedChannel): Unit - - /** - * Scala and Java API.

- * Try to send the specified message to the channel, i.e. fire-and-forget - * semantics, including the sender reference if possible (not supported on - * all channels).

- * From Java: - *

-   * actor.tryTell(message);
-   * actor.tryTell(message, context);
-   * 
- *

- * From Scala: - *

-   * actor tryTell message
-   * actor.tryTell(message)(sender)
-   * 
- */ - def tryTell(msg: T)(implicit sender: UntypedChannel): Boolean = { - try { - this.!(msg)(sender) - true - } catch { - case _: Exception ⇒ false - } - } - - /** - * Try to send an exception. Not all channel types support this, one notable - * positive example is Future. Failure to send is silent. - * - * @return whether sending was successful - */ - def sendException(ex: Throwable): Boolean = false - - /** - * Sends the specified message to the channel, i.e. fire-and-forget semantics.

- *

-   * actor.tell(message);
-   * 
- */ - def tell(msg: T): Unit = this.!(msg) - - /** - * Java API.

- * Sends the specified message to the channel, i.e. fire-and-forget - * semantics, including the sender reference if possible (not supported on - * all channels).

- *

-   * actor.tell(message, context);
-   * 
- */ - def tell(msg: T, sender: UntypedChannel): Unit = this.!(msg)(sender) - -} - -/** - * This trait marks a channel that a priori does have sending capability, - * i.e. ! is not guaranteed to fail (e.g. NullChannel would be a - * counter-example). - */ -trait AvailableChannel[-T] extends Channel[T] - -/** - * This trait marks a channel which is capable of sending exceptions. - */ -trait ExceptionChannel[-T] extends AvailableChannel[T] - -/** - * This trait marks a channel which carries reply information when tell()ing. - */ -trait ReplyChannel[-T] extends AvailableChannel[T] - -/** - * All channels used in conjunction with MessageInvocation are untyped by - * design, so make this explicit. - */ -trait UntypedChannel extends Channel[Any] - -object UntypedChannel { - implicit def senderOption2Channel(sender: Option[ActorRef]): UntypedChannel = - sender match { - case Some(actor) ⇒ actor - case None ⇒ NullChannel - } - - implicit final val default: UntypedChannel = NullChannel -} - -/** - * Default channel when none available. - */ -case object NullChannel extends UntypedChannel { - def !(msg: Any)(implicit channel: UntypedChannel) { - throw new IllegalActorStateException(""" - No sender in scope, can't reply. - You have probably: - 1. Sent a message to an Actor from an instance that is NOT an Actor. - 2. Invoked a method on an TypedActor from an instance NOT an TypedActor. - You may want to have a look at tryTell for a variant returning a Boolean""") - } - override def tryTell(msg: Any)(implicit channel: UntypedChannel): Boolean = false -} - -/** - * Wraps a forwardable channel. Used implicitly by ScalaActorRef.forward - */ -case class ForwardableChannel(val channel: UntypedChannel) diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index 3ea53364ac..2e1202365f 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -478,7 +478,7 @@ trait FSM[S, D] extends ListenerManagement { timeoutFuture = None } generation += 1 - processMsg(value, channel) + processMsg(value, sender) } } @@ -502,7 +502,7 @@ trait FSM[S, D] extends ListenerManagement { nextState.stopReason match { case None ⇒ makeTransition(nextState) case _ ⇒ - nextState.replies.reverse foreach { r ⇒ channel ! r } + nextState.replies.reverse foreach { r ⇒ sender ! r } terminate(nextState) self.stop() } @@ -512,7 +512,7 @@ trait FSM[S, D] extends ListenerManagement { if (!stateFunctions.contains(nextState.stateName)) { terminate(stay withStopReason Failure("Next state %s does not exist".format(nextState.stateName))) } else { - nextState.replies.reverse foreach { r ⇒ channel ! r } + nextState.replies.reverse foreach { r ⇒ sender ! r } if (currentState.stateName != nextState.stateName) { handleTransition(currentState.stateName, nextState.stateName) notifyListeners(Transition(self, currentState.stateName, nextState.stateName)) @@ -599,7 +599,7 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒ val srcstr = source match { case s: String ⇒ s case Timer(name, _, _, _) ⇒ "timer " + name - case c: UntypedChannel ⇒ c.toString + case a: ActorRef ⇒ a.toString case _ ⇒ "unknown" } app.eventHandler.debug(context.self, "processing " + event + " from " + srcstr) diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index bb8cf568f0..c07cb7a128 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -340,11 +340,14 @@ class TypedActor(val app: AkkaApplication) { try { if (m.isOneWay) m(me) else if (m.returnsFuture_?) { - channel match { - case p: ActorPromise ⇒ p completeWith m(me).asInstanceOf[Future[Any]] - case _ ⇒ throw new IllegalStateException("Future-returning TypedActor didn't use ?/ask so cannot reply") + val s = sender + m(me).asInstanceOf[Future[Any]] onComplete { + _.value.get match { + case Left(f) ⇒ s ! akka.actor.Status.Failure(f) + case Right(r) ⇒ s ! r + } } - } else channel ! m(me) + } else sender ! m(me) } finally { TypedActor.selfReference set null diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala index ce7ce1a3f2..c0deb28c81 100644 --- a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala @@ -21,7 +21,7 @@ import akka.dispatch.{ MessageDispatcher, Promise } * * if (msg.equals("UseReply")) { * // Reply to original sender of message using the 'reply' method - * getContext().getChannel().tell(msg + ":" + getSelf().getAddress()); + * getContext().getSender().tell(msg + ":" + getSelf().getAddress()); * * } else if (msg.equals("UseSender") && getSender().isDefined()) { * // Reply to original sender of message using the sender reference @@ -67,12 +67,7 @@ abstract class UntypedActor extends Actor { * The reference sender Actor of the last received message. * Is defined if the message was sent from another Actor, else None. */ - def getSender: ActorRef = sender - - /** - * Abstraction for unification of sender and senderFuture for later reply - */ - def getChannel: UntypedChannel = channel + def getSender(): ActorRef = sender /** * Gets the current receive timeout diff --git a/akka-actor/src/main/scala/akka/actor/package.scala b/akka-actor/src/main/scala/akka/actor/package.scala index a87ad02861..0178db875f 100644 --- a/akka-actor/src/main/scala/akka/actor/package.scala +++ b/akka-actor/src/main/scala/akka/actor/package.scala @@ -23,12 +23,9 @@ package object actor { } implicit def future2actor[T](f: akka.dispatch.Future[T]) = new { - def pipeTo(channel: Channel[T]): this.type = { - if (f.isCompleted) { - f.value.get.fold(channel.sendException(_), channel.tryTell(_)) - } else { - f onComplete { _.value.get.fold(channel.sendException(_), channel.tryTell(_)) } - } + def pipeTo(actor: ActorRef): this.type = { + def send(f: akka.dispatch.Future[T]) { f.value.get.fold(f ⇒ actor ! Status.Failure(f), r ⇒ actor ! r) } + if (f.isCompleted) send(f) else f onComplete send this } } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index f89451b962..baf972a5b2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -17,7 +17,7 @@ import scala.annotation.tailrec /** * @author Jonas Bonér */ -final case class Envelope(val message: Any, val channel: UntypedChannel) { +final case class Envelope(val message: Any, val sender: ActorRef) { if (message.isInstanceOf[AnyRef] && (message.asInstanceOf[AnyRef] eq null)) throw new InvalidMessageException("Message is null") } @@ -107,7 +107,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable object DeadLetterMailbox extends Mailbox(null) { becomeClosed() override def dispatcher = null //MessageDispatcher.this - override def enqueue(envelope: Envelope) { envelope.channel sendException new ActorKilledException("Actor has been stopped") } + override def enqueue(envelope: Envelope) = () override def dequeue() = null override def systemEnqueue(handle: SystemMessage): Unit = () override def systemDrain(): SystemMessage = null diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index a41463999b..86a998b350 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -7,7 +7,7 @@ package akka.dispatch import akka.AkkaException import akka.event.EventHandler -import akka.actor.{ UntypedChannel, Timeout, ExceptionChannel } +import akka.actor.{ Timeout } import scala.Option import akka.japi.{ Procedure, Function ⇒ JFunc, Option ⇒ JOption } @@ -758,11 +758,6 @@ object Promise { * Creates a non-completed, new, Promise with the default timeout (akka.actor.timeout in conf) */ def apply[A]()(implicit dispatcher: MessageDispatcher, timeout: Timeout): Promise[A] = apply(timeout) - - /** - * Construct a completable channel - */ - def channel(timeout: Long)(implicit dispatcher: MessageDispatcher): ActorPromise = new ActorPromise(timeout) } /** @@ -1024,31 +1019,6 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi private def timeLeftNoinline(): Long = timeLeft() } -class ActorPromise(timeout: Timeout)(implicit dispatcher: MessageDispatcher) extends DefaultPromise[Any](timeout)(dispatcher) with UntypedChannel with ExceptionChannel[Any] { - - def !(message: Any)(implicit channel: UntypedChannel) = completeWithResult(message) - - override def sendException(ex: Throwable) = { - completeWithException(ex) - value == Some(Left(ex)) - } - - def channel: UntypedChannel = this - -} - -object ActorPromise { - def apply(f: Promise[Any])(timeout: Timeout = f.timeout): ActorPromise = - new ActorPromise(timeout)(f.dispatcher) { - completeWith(f) - override def !(message: Any)(implicit channel: UntypedChannel) = f completeWithResult message - override def sendException(ex: Throwable) = { - f completeWithException ex - f.value == Some(Left(ex)) - } - } -} - /** * An already completed Future is seeded with it's result at creation, is useful for when you are participating in * a Future-composition but you already have a value to contribute. diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index 9ea8dc78a8..8a26c25a84 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -103,7 +103,7 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒ protected def _route(): Actor.Receive = { // for testing... case Stat ⇒ - channel.tryTell(Stats(_delegates length)) + sender ! Stats(_delegates length) case Terminated(victim) ⇒ _delegates = _delegates filterNot { victim == } case msg ⇒ @@ -285,16 +285,14 @@ trait MailboxPressureCapacitor { /** * Implements pressure() to return the number of actors currently processing a - * message whose reply will be sent to a [[akka.dispatch.Future]]. + * message. * In other words, this capacitor counts how many - * delegates are tied up actively processing a message, as long as the - * messages have somebody waiting on the result. "One way" messages with - * no reply would not be counted. + * delegates are tied up actively processing a message */ -trait ActiveFuturesPressureCapacitor { +trait ActiveActorsPressureCapacitor { def pressure(delegates: Seq[ActorRef]): Int = delegates count { - case a: LocalActorRef ⇒ a.underlying.channel.isInstanceOf[Promise[_]] + case a: LocalActorRef ⇒ !a.underlying.sender.isShutdown case _ ⇒ false } } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index b68c6016ae..ca7c60a619 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -42,7 +42,7 @@ trait Router { * * @throws RoutingException if something goes wrong while routing the message */ - def route(message: Any)(implicit sender: Option[ActorRef]) + def route(message: Any)(implicit sender: ActorRef) /** * Routes the message using a timeout to one of the connections and returns a Future to synchronize on the @@ -50,7 +50,7 @@ trait Router { * * @throws RoutingExceptionif something goes wrong while routing the message. */ - def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T] + def route[T](message: Any, timeout: Timeout)(implicit sender: ActorRef): Future[T] } /** @@ -93,34 +93,19 @@ object Routing { /** * An Abstract convenience implementation for building an ActorReference that uses a Router. */ -abstract private[akka] class AbstractRoutedActorRef(val props: RoutedProps) extends UnsupportedActorRef { - private[akka] override val uuid: Uuid = newUuid - +abstract private[akka] class AbstractRoutedActorRef(val app: AkkaApplication, val props: RoutedProps) extends UnsupportedActorRef { val router = props.routerFactory() - override def postMessageToMailbox(message: Any, channel: UntypedChannel) = { - val sender = channel match { - case ref: ActorRef ⇒ Some(ref) - case _ ⇒ None - } - router.route(message)(sender) - } + override def postMessageToMailbox(message: Any, sender: ActorRef) = router.route(message)(sender) - override def postMessageToMailboxAndCreateFutureResultWithTimeout( - message: Any, timeout: Timeout, channel: UntypedChannel): Future[Any] = { - val sender = channel match { - case ref: ActorRef ⇒ Some(ref) - case _ ⇒ None - } - router.route[Any](message, timeout)(sender) - } + override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = app.provider.ask(message, this, timeout) } /** * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to * on (or more) of these actors. */ -private[akka] class RoutedActorRef(val routedProps: RoutedProps, override val address: String) extends AbstractRoutedActorRef(routedProps) { +private[akka] class RoutedActorRef(app: AkkaApplication, val routedProps: RoutedProps, override val address: String) extends AbstractRoutedActorRef(app, routedProps) { @volatile private var running: Boolean = true @@ -131,7 +116,7 @@ private[akka] class RoutedActorRef(val routedProps: RoutedProps, override val ad synchronized { if (running) { running = false - router.route(Routing.Broadcast(PoisonPill))(Some(this)) + router.route(Routing.Broadcast(PoisonPill))(this) } } } @@ -152,7 +137,7 @@ trait BasicRouter extends Router { this.connectionManager = connectionManager } - def route(message: Any)(implicit sender: Option[ActorRef]) = message match { + def route(message: Any)(implicit sender: ActorRef) = message match { case Routing.Broadcast(message) ⇒ //it is a broadcast message, we are going to send to message to all connections. connectionManager.connections.iterable foreach { connection ⇒ @@ -180,7 +165,7 @@ trait BasicRouter extends Router { } } - def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T] = message match { + def route[T](message: Any, timeout: Timeout)(implicit sender: ActorRef): Future[T] = message match { case Routing.Broadcast(message) ⇒ throw new RoutingException("Broadcasting using '?'/'ask' is for the time being is not supported. Use ScatterGatherRouter.") case _ ⇒ @@ -188,8 +173,7 @@ trait BasicRouter extends Router { next match { case Some(connection) ⇒ try { - // FIXME is this not wrong? it will not pass on and use the original Future but create a new one. Should reuse 'channel: UntypedChannel' in the AbstractRoutedActorRef - connection.?(message, timeout)(sender).asInstanceOf[Future[T]] + connection.?(message, timeout).asInstanceOf[Future[T]] //FIXME this does not preserve the original sender, shouldn't it? } catch { case e: Exception ⇒ connectionManager.remove(connection) @@ -202,10 +186,7 @@ trait BasicRouter extends Router { protected def next: Option[ActorRef] - private def throwNoConnectionsError = { - val error = new RoutingException("No replica connections for router") - throw error - } + private def throwNoConnectionsError = throw new RoutingException("No replica connections for router") } /** @@ -359,11 +340,11 @@ trait ScatterGatherRouter extends BasicRouter with Serializable { */ protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] - private def scatterGather[S, G >: S](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[G] = { + private def scatterGather[S, G >: S](message: Any, timeout: Timeout)(implicit sender: ActorRef): Future[G] = { val responses = connectionManager.connections.iterable.flatMap { actor ⇒ try { if (actor.isShutdown) throw ActorInitializationException(actor, "For compatability - check death first", new Exception) // for stack trace - Some(actor.?(message, timeout)(sender).asInstanceOf[Future[S]]) + Some(actor.?(message, timeout).asInstanceOf[Future[S]]) } catch { case e: Exception ⇒ connectionManager.remove(actor) @@ -376,7 +357,7 @@ trait ScatterGatherRouter extends BasicRouter with Serializable { else gather(responses) } - override def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T] = message match { + override def route[T](message: Any, timeout: Timeout)(implicit sender: ActorRef): Future[T] = message match { case Routing.Broadcast(message) ⇒ scatterGather(message, timeout) case message ⇒ super.route(message, timeout)(sender) } diff --git a/akka-camel-typed/src/test/scala/akka/camel/TypedCamelTestSupport.scala b/akka-camel-typed/src/test/scala/akka/camel/TypedCamelTestSupport.scala index eb4204ce0e..42b38e5bb3 100644 --- a/akka-camel-typed/src/test/scala/akka/camel/TypedCamelTestSupport.scala +++ b/akka-camel-typed/src/test/scala/akka/camel/TypedCamelTestSupport.scala @@ -24,7 +24,7 @@ object TypedCamelTestSupport { def countdown: Handler = { case SetExpectedMessageCount(num) ⇒ { latch = new CountDownLatch(num) - channel ! latch + sender ! latch } case msg ⇒ latch.countDown } @@ -32,7 +32,7 @@ object TypedCamelTestSupport { trait Respond { this: Actor ⇒ def respond: Handler = { - case msg: Message ⇒ channel ! response(msg) + case msg: Message ⇒ sender ! response(msg) } def response(msg: Message): Any = "Hello %s" format msg.body @@ -42,8 +42,8 @@ object TypedCamelTestSupport { val messages = Buffer[Any]() def retain: Handler = { - case GetRetainedMessage ⇒ channel ! messages.last - case GetRetainedMessages(p) ⇒ channel ! messages.filter(p).toList + case GetRetainedMessage ⇒ sender ! messages.last + case GetRetainedMessages(p) ⇒ sender ! messages.filter(p).toList case msg ⇒ { messages += msg msg diff --git a/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala b/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala index 062c6246b5..eef98f4f25 100644 --- a/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala +++ b/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala @@ -127,11 +127,11 @@ private[camel] class ActivationTracker extends Actor { def receive = { case SetExpectedActivationCount(num) ⇒ { activationLatch = new CountDownLatch(num) - channel ! activationLatch + sender ! activationLatch } case SetExpectedDeactivationCount(num) ⇒ { deactivationLatch = new CountDownLatch(num) - channel ! deactivationLatch + sender ! deactivationLatch } case EndpointActivated ⇒ activationLatch.countDown case EndpointDeactivated ⇒ deactivationLatch.countDown diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala index bdc0079251..8c65d71c66 100644 --- a/akka-camel/src/main/scala/akka/camel/Producer.scala +++ b/akka-camel/src/main/scala/akka/camel/Producer.scala @@ -97,9 +97,9 @@ trait ProducerSupport { this: Actor ⇒ val exchange = createExchange(pattern).fromRequestMessage(cmsg) processor.process(exchange, new AsyncCallback { val producer = self - // Need copies of channel reference here since the callback could be done + // Need copies of sender reference here since the callback could be done // later by another thread. - val replyChannel = channel + val replyChannel = sender def done(doneSync: Boolean) { (doneSync, exchange.isFailed) match { @@ -159,7 +159,7 @@ trait ProducerSupport { this: Actor ⇒ * actor). */ protected def receiveAfterProduce: Receive = { - case msg ⇒ if (!oneway) channel ! msg + case msg ⇒ if (!oneway) sender ! msg } /** diff --git a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala index 7ebbcc9a13..7a941679f1 100644 --- a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala @@ -16,7 +16,7 @@ import akka.actor._ import akka.camel.{ Ack, Failure, Message } import akka.camel.CamelMessageConversion.toExchangeAdapter import scala.reflect.BeanProperty -import akka.dispatch.{ FutureTimeoutException, Promise, MessageDispatcher } +import akka.dispatch._ /** * @author Martin Krasser @@ -274,9 +274,7 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall def resume(): Unit = () - def stop() { - running = false - } + def stop() { running = false } /** * Populates the initial exchange with the reply message and uses the @@ -286,7 +284,7 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall * @param message reply message * @param sender ignored */ - protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel) = if(running) { + protected[akka] def postMessageToMailbox(message: Any, sender: ActorRef) = if(running) { message match { case Ack ⇒ { /* no response message to set */ } case msg: Failure ⇒ exchange.fromFailureMessage(msg) @@ -298,7 +296,8 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall def startsMonitoring(actorRef: ActorRef): ActorRef = unsupported def stopsMonitoring(actorRef: ActorRef): ActorRef = unsupported - protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any, timeout: Timeout, channel: UntypedChannel) = unsupported + def ?(message: Any)(implicit timeout: Timeout): Future[Any] = + new KeptPromise[Any](Left(new UnsupportedOperationException("Ask/? is not supported for %s".format(getClass.getName)))) def restart(reason: Throwable): Unit = unsupported private def unsupported = throw new UnsupportedOperationException("Not supported for %s" format classOf[AsyncCallbackAdapter].getName) diff --git a/akka-camel/src/test/java/akka/camel/SampleUntypedConsumer.java b/akka-camel/src/test/java/akka/camel/SampleUntypedConsumer.java index 93d0427902..0a0c0c7c35 100644 --- a/akka-camel/src/test/java/akka/camel/SampleUntypedConsumer.java +++ b/akka-camel/src/test/java/akka/camel/SampleUntypedConsumer.java @@ -15,7 +15,7 @@ public class SampleUntypedConsumer extends UntypedConsumerActor { Message msg = (Message)message; String body = msg.getBodyAs(String.class); String header = msg.getHeaderAs("test", String.class); - channel.tryTell(String.format("%s %s", body, header)); + sender.tell(String.format("%s %s", body, header)); } } diff --git a/akka-camel/src/test/java/akka/camel/SampleUntypedConsumerBlocking.java b/akka-camel/src/test/java/akka/camel/SampleUntypedConsumerBlocking.java index db3ea7665d..5b29ab2300 100644 --- a/akka-camel/src/test/java/akka/camel/SampleUntypedConsumerBlocking.java +++ b/akka-camel/src/test/java/akka/camel/SampleUntypedConsumerBlocking.java @@ -17,7 +17,7 @@ public class SampleUntypedConsumerBlocking extends UntypedConsumerActor { Message msg = (Message)message; String body = msg.getBodyAs(String.class); String header = msg.getHeaderAs("test", String.class); - channel.tryTell(String.format("%s %s", body, header)); + sender.tell(String.format("%s %s", body, header)); } } diff --git a/akka-camel/src/test/scala/akka/camel/CamelTestSupport.scala b/akka-camel/src/test/scala/akka/camel/CamelTestSupport.scala index d099dba708..01247ced2c 100644 --- a/akka-camel/src/test/scala/akka/camel/CamelTestSupport.scala +++ b/akka-camel/src/test/scala/akka/camel/CamelTestSupport.scala @@ -36,7 +36,7 @@ object CamelTestSupport { def countdown: Handler = { case SetExpectedMessageCount(num) ⇒ { latch = new CountDownLatch(num) - channel ! latch + sender ! latch } case msg ⇒ latch.countDown } @@ -44,7 +44,7 @@ object CamelTestSupport { trait Respond { this: Actor ⇒ def respond: Handler = { - case msg: Message ⇒ channel ! response(msg) + case msg: Message ⇒ sender ! response(msg) } def response(msg: Message): Any = "Hello %s" format msg.body @@ -54,8 +54,8 @@ object CamelTestSupport { val messages = Buffer[Any]() def retain: Handler = { - case GetRetainedMessage ⇒ channel ! messages.last - case GetRetainedMessages(p) ⇒ channel ! messages.filter(p).toList + case GetRetainedMessage ⇒ sender ! messages.last + case GetRetainedMessages(p) ⇒ sender ! messages.filter(p).toList case msg ⇒ { messages += msg msg diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala index 7a83e09de7..97eb8b49a3 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala @@ -211,7 +211,7 @@ object ConsumerScalaTest { class TestConsumer(uri: String) extends Actor with Consumer { def endpointUri = uri protected def receive = { - case msg: Message ⇒ channel ! "received %s" format msg.body + case msg: Message ⇒ sender ! "received %s" format msg.body } } @@ -226,7 +226,7 @@ object ConsumerScalaTest { def endpointUri = uri override def autoack = false protected def receive = { - case msg: Message ⇒ channel ! Ack + case msg: Message ⇒ sender ! Ack } } @@ -247,15 +247,15 @@ object ConsumerScalaTest { protected def receive = { case "fail" ⇒ { throw new Exception("test") } - case "succeed" ⇒ channel ! "ok" + case "succeed" ⇒ sender ! "ok" } override def preRestart(reason: scala.Throwable, msg: Option[Any]) { - channel.tryTell("pr") + sender.tell("pr") } override def postStop { - channel.tryTell("ps") + sender.tell("ps") } } @@ -288,7 +288,7 @@ object ConsumerScalaTest { } private def respondTo(msg: Message) = - if (valid) channel ! ("accepted: %s" format msg.body) + if (valid) sender ! ("accepted: %s" format msg.body) else throw new Exception("rejected: %s" format msg.body) } diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala index ad1ec6b5cf..c2614d2263 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala @@ -253,16 +253,16 @@ object ProducerFeatureTest { class TestResponder extends Actor { protected def receive = { case msg: Message ⇒ msg.body match { - case "fail" ⇒ channel ! Failure(new Exception("failure"), msg.headers) - case _ ⇒ channel ! (msg.transformBody { body: String ⇒ "received %s" format body }) + case "fail" ⇒ sender ! Failure(new Exception("failure"), msg.headers) + case _ ⇒ sender ! (msg.transformBody { body: String ⇒ "received %s" format body }) } } } class ReplyingForwardTarget extends Actor { protected def receive = { - case msg: Message ⇒ channel ! msg.addHeader("test" -> "result") - case msg: Failure ⇒ channel ! Failure(msg.cause, msg.headers + ("test" -> "failure")) + case msg: Message ⇒ sender ! msg.addHeader("test" -> "result") + case msg: Failure ⇒ sender ! Failure(msg.cause, msg.headers + ("test" -> "failure")) } } diff --git a/akka-camel/src/test/scala/akka/camel/component/ActorComponentFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/component/ActorComponentFeatureTest.scala index c05ceffaaa..24fc306268 100644 --- a/akka-camel/src/test/scala/akka/camel/component/ActorComponentFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/component/ActorComponentFeatureTest.scala @@ -96,13 +96,13 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with object ActorComponentFeatureTest { class CustomIdActor extends Actor { protected def receive = { - case msg: Message ⇒ channel ! ("Received %s" format msg.body) + case msg: Message ⇒ sender ! ("Received %s" format msg.body) } } class FailWithMessage extends Actor { protected def receive = { - case msg: Message ⇒ channel ! Failure(new Exception("test")) + case msg: Message ⇒ sender ! Failure(new Exception("test")) } } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 79f740b3b7..db5f5306d9 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -300,7 +300,7 @@ class DefaultClusterNode private[akka] ( val remote = new akka.cluster.netty.NettyRemoteSupport remote.start(hostname, port) remote.register(RemoteClusterDaemon.Address, remoteDaemon) - remote.addListener(RemoteFailureDetector.channel) + remote.addListener(RemoteFailureDetector.sender) remote.addListener(remoteClientLifeCycleHandler) remote } @@ -427,7 +427,7 @@ class DefaultClusterNode private[akka] ( remoteService.shutdown() // shutdown server - RemoteFailureDetector.channel.stop() + RemoteFailureDetector.sender.stop() remoteClientLifeCycleHandler.stop() remoteDaemon.stop() diff --git a/akka-docs/intro/code/tutorials/first/Pi.scala b/akka-docs/intro/code/tutorials/first/Pi.scala index c346db3820..142f598dad 100644 --- a/akka-docs/intro/code/tutorials/first/Pi.scala +++ b/akka-docs/intro/code/tutorials/first/Pi.scala @@ -50,7 +50,7 @@ object Pi extends App { //#calculatePiFor def receive = { - case Work(start, nrOfElements) ⇒ channel ! Result(calculatePiFor(start, nrOfElements)) // perform the work + case Work(start, nrOfElements) ⇒ sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work } } //#worker diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala index af7ab12983..dba7f16958 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala @@ -91,7 +91,7 @@ case class DurableDispatcher( override def createMailbox(actorRef: LocalActorRef): AnyRef = _storage.createFor(actorRef) protected[akka] override def dispatch(invocation: MessageInvocation) { - if (invocation.channel.isInstanceOf[ActorPromise]) + if (invocation.sender.isInstanceOf[ActorPromise]) throw new IllegalArgumentException("Durable mailboxes do not support Future-based messages from ?") super.dispatch(invocation) } diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index 69b553ac03..b6410b2c86 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -50,7 +50,7 @@ abstract class DurableExecutableMailbox(owner: LocalActorRef) extends MessageQue val builder = DurableMailboxMessageProtocol.newBuilder .setOwnerAddress(ownerAddress) .setMessage(message.toByteString) - durableMessage.channel match { + durableMessage.sender match { case a: ActorRef ⇒ builder.setSenderAddress(a.address) case _ ⇒ } diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala index 18f47bf0d4..0f852a2e8e 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala @@ -35,7 +35,7 @@ object BSONSerializableMailbox extends SerializableBSONObject[MongoDurableMessag b += "_id" -> msg._id b += "ownerAddress" -> msg.ownerAddress - msg.channel match { + msg.sender match { case a: ActorRef ⇒ { b += "senderAddress" -> a.address } case _ ⇒ () } diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index e8ce6957b3..8582f881af 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -47,7 +47,7 @@ class MongoBasedNaiveMailbox(val owner: LocalActorRef) extends DurableExecutable EventHandler.debug(this, "\nENQUEUING message in mongodb-based mailbox [%s]".format(msg)) /* TODO - Test if a BSON serializer is registered for the message and only if not, use toByteString? */ - val durableMessage = MongoDurableMessage(ownerAddress, msg.receiver, msg.message, msg.channel) + val durableMessage = MongoDurableMessage(ownerAddress, msg.receiver, msg.message, msg.sender) // todo - do we need to filter the actor name at all for safe collection naming? val result = new DefaultPromise[Boolean](writeTimeout) mongo.insert(durableMessage, false)(RequestFutures.write { wr: Either[Throwable, (Option[AnyRef], WriteResult)] ⇒ diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoDurableMessage.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoDurableMessage.scala index 93d4951fe7..5678feba03 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoDurableMessage.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoDurableMessage.scala @@ -35,10 +35,10 @@ import org.bson.collection._ case class MongoDurableMessage(val ownerAddress: String, val receiver: LocalActorRef, val message: Any, - val channel: UntypedChannel, + val sender: UntypedChannel, val _id: ObjectId = new ObjectId) { - def messageInvocation() = MessageInvocation(this.receiver, this.message, this.channel) + def messageInvocation() = MessageInvocation(this.receiver, this.message, this.sender) } // vim: set ts=2 sw=2 sts=2 et: diff --git a/akka-http/src/main/scala/akka/http/Mist.scala b/akka-http/src/main/scala/akka/http/Mist.scala index a079ee8ecd..109804f73f 100644 --- a/akka-http/src/main/scala/akka/http/Mist.scala +++ b/akka-http/src/main/scala/akka/http/Mist.scala @@ -10,7 +10,7 @@ import javax.servlet.http.{ HttpServletResponse, HttpServletRequest } import javax.servlet.http.HttpServlet import javax.servlet.Filter import java.lang.UnsupportedOperationException -import akka.actor.{ NullChannel, ActorRef, Actor } +import akka.actor.{ ActorRef, Actor } import Types._ import akka.AkkaApplication @@ -246,10 +246,8 @@ trait Endpoint { this: Actor ⇒ if (!endpoints.isEmpty) endpoints.foreach { _.apply(uri) ! req } else { - channel match { - case null | NullChannel ⇒ _na(uri, req) - case channel ⇒ channel ! NoneAvailable(uri, req) - } + if (sender.isShutdown) _na(uri, req) + else sender ! NoneAvailable(uri, req) } } } diff --git a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala index efd645cefd..872b2c23f3 100644 --- a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala +++ b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala @@ -13,7 +13,7 @@ import akka.AkkaApplication /** * Stream of all kinds of network events, remote failure and connection events, cluster failure and connection events etc. - * Also provides API for channel listener management. + * Also provides API for sender listener management. * * @author Jonas Bonér */ @@ -65,7 +65,7 @@ class NetworkEventStream(val app: AkkaApplication) { import NetworkEventStream._ // FIXME: check that this supervision is correct - private[akka] val channel = app.provider.actorOf( + private[akka] val sender = app.provider.actorOf( Props[Channel].copy(dispatcher = app.dispatcherFactory.newPinnedDispatcher("NetworkEventStream")), app.guardian, Props.randomAddress, systemService = true) @@ -73,11 +73,11 @@ class NetworkEventStream(val app: AkkaApplication) { * Registers a network event stream listener (asyncronously). */ def register(listener: Listener, connectionAddress: InetSocketAddress) = - channel ! Register(listener, connectionAddress) + sender ! Register(listener, connectionAddress) /** * Unregisters a network event stream listener (asyncronously) . */ def unregister(listener: Listener, connectionAddress: InetSocketAddress) = - channel ! Unregister(listener, connectionAddress) + sender ! Unregister(listener, connectionAddress) } diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 550b1a20c7..82c3ea823d 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -75,8 +75,10 @@ class Remote(val app: AkkaApplication) extends RemoteService { val remote = new akka.remote.netty.NettyRemoteSupport(app) remote.start(hostname, port) remote.register(remoteDaemonServiceName, remoteDaemon) - app.eventHandler.addListener(eventStream.channel) + + app.eventHandler.addListener(eventStream.sender) app.eventHandler.addListener(remoteClientLifeCycleHandler) + // TODO actually register this provider in app in remote mode //provider.register(ActorRefProvider.RemoteProvider, new RemoteActorRefProvider) remote @@ -161,10 +163,10 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { eventHandler.error(this, "Actor 'address' for actor to instantiate is not defined, ignoring remote system daemon command [%s]".format(message)) } - channel ! Success(address.toString) + sender ! Success(address.toString) } catch { case error: Throwable ⇒ - channel ! Failure(error) + sender ! Failure(error) throw error } } @@ -182,10 +184,10 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { // gossiper tell gossip - // channel ! Success(address.toString) + // sender ! Success(address.toString) // } catch { // case error: Throwable ⇒ - // channel ! Failure(error) + // sender ! Failure(error) // throw error // } } @@ -204,7 +206,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { new LocalActorRef(app, Props( context ⇒ { - case f: Function0[_] ⇒ try { channel ! f() } finally { context.self.stop() } + case f: Function0[_] ⇒ try { sender ! f() } finally { context.self.stop() } }).copy(dispatcher = computeGridDispatcher), app.guardian, Props.randomAddress, systemService = true) forward payloadFor(message, classOf[Function0[Any]]) } @@ -222,7 +224,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { new LocalActorRef(app, Props( context ⇒ { - case (fun: Function[_, _], param: Any) ⇒ try { channel ! fun.asInstanceOf[Any ⇒ Any](param) } finally { context.self.stop() } + case (fun: Function[_, _], param: Any) ⇒ try { sender ! fun.asInstanceOf[Any ⇒ Any](param) } finally { context.self.stop() } }).copy(dispatcher = computeGridDispatcher), app.guardian, Props.randomAddress, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]]) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index b87eae50aa..c65c05bff6 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -33,7 +33,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider import java.util.concurrent.ConcurrentHashMap import akka.dispatch.Promise - private[akka] val theOneWhoWalksTheBubblesOfSpaceTime: ActorRef = new UnsupportedActorRef {} + private[akka] val theOneWhoWalksTheBubblesOfSpaceTime: ActorRef = local.theOneWhoWalksTheBubblesOfSpaceTime private[akka] def terminationFuture = new DefaultPromise[AkkaApplication.ExitStatus](Timeout.never)(app.dispatcher) val local = new LocalActorRefProvider(app) @@ -143,7 +143,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider // FIXME: implement supervision def actorOf(props: RoutedProps, supervisor: ActorRef, address: String): ActorRef = { if (props.connectionManager.isEmpty) throw new ConfigurationException("RoutedProps used for creating actor [" + address + "] has zero connections configured; can't create a router") - new RoutedActorRef(props, address) + new RoutedActorRef(app, props, address) } def actorFor(address: String): Option[ActorRef] = actors.get(address) match { @@ -231,9 +231,9 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider * @author Jonas Bonér */ private[akka] case class RemoteActorRef private[akka] ( - val remote: RemoteSupport, - val remoteAddress: InetSocketAddress, - val address: String, + remote: RemoteSupport, + remoteAddress: InetSocketAddress, + address: String, loader: Option[ClassLoader]) extends ActorRef with ScalaActorRef { @@ -246,23 +246,11 @@ private[akka] case class RemoteActorRef private[akka] ( protected[akka] def sendSystemMessage(message: SystemMessage): Unit = unsupported - def postMessageToMailbox(message: Any, channel: UntypedChannel) { - val chSender = if (channel.isInstanceOf[ActorRef]) Some(channel.asInstanceOf[ActorRef]) else None - remote.send[Any](message, chSender, None, remoteAddress, true, this, loader) + def postMessageToMailbox(message: Any, sender: ActorRef) { + remote.send[Any](message, Some(sender), None, remoteAddress, true, this, loader) } - def postMessageToMailboxAndCreateFutureResultWithTimeout( - message: Any, - timeout: Timeout, - channel: UntypedChannel): Future[Any] = { - - val chSender = if (channel.isInstanceOf[ActorRef]) Some(channel.asInstanceOf[ActorRef]) else None - val chFuture = if (channel.isInstanceOf[Promise[_]]) Some(channel.asInstanceOf[Promise[Any]]) else None - val future = remote.send[Any](message, chSender, chFuture, remoteAddress, false, this, loader) - - if (future.isDefined) ActorPromise(future.get)(timeout) - else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) - } + def ?(message: Any)(implicit timeout: Timeout): Future[Any] = remote.app.provider.ask(message, this, timeout) def suspend(): Unit = unsupported @@ -272,7 +260,7 @@ private[akka] case class RemoteActorRef private[akka] ( synchronized { if (running) { running = false - postMessageToMailbox(Terminate, None) + postMessageToMailbox(new Terminate(), remote.app.deadLetters) } } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index d143b20013..ac29e0d5b1 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -27,7 +27,7 @@ import java.util.concurrent.atomic._ import akka.AkkaException import akka.AkkaApplication import akka.serialization.RemoteActorSerialization -import akka.dispatch.{ Terminate, ActorPromise, DefaultPromise, Promise } +import akka.dispatch.{ Terminate, DefaultPromise, Promise } class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) { def this(msg: String) = this(msg, null); @@ -147,7 +147,7 @@ abstract class RemoteClient private[akka] ( remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getPort - val serialization = new RemoteActorSerialization(app, remoteSupport) + val serialization = new RemoteActorSerialization(remoteSupport) protected val futures = new ConcurrentHashMap[Uuid, Promise[_]] @@ -587,7 +587,7 @@ class NettyRemoteServer(app: AkkaApplication, serverModule: NettyRemoteServerMod val settings = new RemoteServerSettings(app) import settings._ - val serialization = new RemoteActorSerialization(app, serverModule.remoteSupport) + val serialization = new RemoteActorSerialization(serverModule.remoteSupport) val name = "NettyRemoteServer@" + host + ":" + port val address = new InetSocketAddress(host, port) @@ -952,42 +952,18 @@ class RemoteServerHandler( } val message = MessageSerializer.deserialize(app, request.getMessage) - val sender = - if (request.hasSender) Some(serialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader)) - else None + val sender = if (request.hasSender) serialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader) else app.deadLetters message match { // first match on system messages - case Terminate ⇒ - if (UNTRUSTED_MODE) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") - else actorRef.stop() + case _: Terminate ⇒ + if (UNTRUSTED_MODE) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") else actorRef.stop() case _: AutoReceivedMessage if (UNTRUSTED_MODE) ⇒ throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor") case _ ⇒ // then match on user defined messages - if (request.getOneWay) actorRef.!(message)(sender) - else actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout( - message, - request.getActorInfo.getTimeout, - new ActorPromise(request.getActorInfo.getTimeout). - onComplete(_.value.get match { - case Left(exception) ⇒ write(channel, createErrorReplyMessage(exception, request)) - case r: Right[_, _] ⇒ - val messageBuilder = serialization.createRemoteMessageProtocolBuilder( - Some(actorRef), - Right(request.getUuid), - actorInfo.getAddress, - actorInfo.getTimeout, - r.asInstanceOf[Either[Throwable, Any]], - isOneWay = true, - Some(actorRef)) - - // FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method - if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) - - write(channel, RemoteEncoder.encode(messageBuilder.build)) - })) + actorRef.!(message)(sender) } } diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index c4bcfce6ab..c49e8c5dbe 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -28,7 +28,7 @@ import com.eaio.uuid.UUID class ActorSerialization(val app: AkkaApplication, remote: RemoteSupport) { implicit val defaultSerializer = akka.serialization.JavaSerializer // Format.Default - val remoteActorSerialization = new RemoteActorSerialization(app, remote) + val remoteActorSerialization = new RemoteActorSerialization(remote) def fromBinary[T <: Actor](bytes: Array[Byte], homeAddress: InetSocketAddress): ActorRef = fromBinaryToLocalActorRef(bytes, None, Some(homeAddress)) @@ -107,7 +107,7 @@ class ActorSerialization(val app: AkkaApplication, remote: RemoteSupport) { app.AkkaConfig.ActorTimeoutMillis, Right(m.message), false, - m.channel match { + m.sender match { case a: ActorRef ⇒ Some(a) case _ ⇒ None }) @@ -221,7 +221,7 @@ class ActorSerialization(val app: AkkaApplication, remote: RemoteSupport) { } } -class RemoteActorSerialization(val app: AkkaApplication, remote: RemoteSupport) { +class RemoteActorSerialization(remote: RemoteSupport) { /** * Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance. @@ -239,7 +239,7 @@ class RemoteActorSerialization(val app: AkkaApplication, remote: RemoteSupport) * Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance. */ private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { - app.eventHandler.debug(this, "Deserializing RemoteActorRefProtocol to RemoteActorRef:\n %s".format(protocol)) + remote.app.eventHandler.debug(this, "Deserializing RemoteActorRefProtocol to RemoteActorRef:\n %s".format(protocol)) val ref = RemoteActorRef( remote, @@ -247,7 +247,7 @@ class RemoteActorSerialization(val app: AkkaApplication, remote: RemoteSupport) protocol.getAddress, loader) - app.eventHandler.debug(this, "Newly deserialized RemoteActorRef has uuid: %s".format(ref.uuid)) + remote.app.eventHandler.debug(this, "Newly deserialized RemoteActorRef has uuid: %s".format(ref.uuid)) ref } @@ -261,17 +261,17 @@ class RemoteActorSerialization(val app: AkkaApplication, remote: RemoteSupport) ar.remoteAddress case ar: LocalActorRef ⇒ remote.registerByUuid(ar) - app.defaultAddress + remote.app.defaultAddress case _ ⇒ - app.defaultAddress + remote.app.defaultAddress } - app.eventHandler.debug(this, "Register serialized Actor [%s] as remote @ [%s]".format(actor.uuid, remoteAddress)) + remote.app.eventHandler.debug(this, "Register serialized Actor [%s] as remote @ [%s]".format(actor.uuid, remoteAddress)) RemoteActorRefProtocol.newBuilder .setInetSocketAddress(ByteString.copyFrom(JavaSerializer.toBinary(remoteAddress))) .setAddress(actor.address) - .setTimeout(app.AkkaConfig.ActorTimeoutMillis) + .setTimeout(remote.app.AkkaConfig.ActorTimeoutMillis) .build } @@ -305,7 +305,7 @@ class RemoteActorSerialization(val app: AkkaApplication, remote: RemoteSupport) message match { case Right(message) ⇒ - messageBuilder.setMessage(MessageSerializer.serialize(app, message.asInstanceOf[AnyRef])) + messageBuilder.setMessage(MessageSerializer.serialize(remote.app, message.asInstanceOf[AnyRef])) case Left(exception) ⇒ messageBuilder.setException(ExceptionProtocol.newBuilder .setClassname(exception.getClass.getName) diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmSpec.scala index c3cfedfc0b..1577066d67 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmSpec.scala @@ -10,7 +10,7 @@ object DirectRoutedRemoteActorMultiJvmSpec { class SomeActor extends Actor with Serializable { def receive = { - case "identify" ⇒ channel ! app.nodename + case "identify" ⇒ sender ! app.nodename } } } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmSpec.scala index 375b380f8c..b1e8f793b9 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmSpec.scala @@ -8,7 +8,7 @@ object NewRemoteActorMultiJvmSpec { class SomeActor extends Actor with Serializable { def receive = { - case "identify" ⇒ channel ! app.nodename + case "identify" ⇒ sender ! app.nodename } } } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala index 5001227865..380f4d1712 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala @@ -9,7 +9,7 @@ object RandomRoutedRemoteActorMultiJvmSpec { val NrOfNodes = 4 class SomeActor extends Actor with Serializable { def receive = { - case "hit" ⇒ channel ! app.nodename + case "hit" ⇒ sender ! app.nodename case "end" ⇒ self.stop() } } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala index afc8fa13fa..a076a91786 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala @@ -9,7 +9,7 @@ object RoundRobinRoutedRemoteActorMultiJvmSpec { val NrOfNodes = 4 class SomeActor extends Actor with Serializable { def receive = { - case "hit" ⇒ channel ! app.nodename + case "hit" ⇒ sender ! app.nodename case "end" ⇒ self.stop() } } diff --git a/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala b/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala index 9eeab66c7a..a7d8b374e7 100644 --- a/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala +++ b/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala @@ -129,7 +129,7 @@ class MyJavaSerializableActor extends Actor with scala.Serializable { def receive = { case "hello" ⇒ count = count + 1 - channel ! "world " + count + sender ! "world " + count } } @@ -137,7 +137,7 @@ class MyStatelessActorWithMessagesInMailbox extends Actor with scala.Serializabl def receive = { case "hello" ⇒ Thread.sleep(500) - case "hello-reply" ⇒ channel ! "world" + case "hello-reply" ⇒ sender ! "world" } } @@ -145,7 +145,7 @@ class MyActorWithProtobufMessagesInMailbox extends Actor with scala.Serializable def receive = { case m: Message ⇒ Thread.sleep(500) - case "hello-reply" ⇒ channel ! "world" + case "hello-reply" ⇒ sender ! "world" } } @@ -153,6 +153,6 @@ class PersonActorWithMessagesInMailbox extends Actor with scala.Serializable { def receive = { case p: Person ⇒ Thread.sleep(500) - case "hello-reply" ⇒ channel ! "hello" + case "hello-reply" ⇒ sender ! "hello" } } diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/UntypedConsumer1.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/UntypedConsumer1.java index dc89a22ceb..718f8f9606 100644 --- a/akka-samples/akka-sample-camel/src/main/java/sample/camel/UntypedConsumer1.java +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/UntypedConsumer1.java @@ -15,6 +15,6 @@ public class UntypedConsumer1 extends UntypedConsumerActor { public void onReceive(Object message) { Message msg = (Message)message; String body = msg.getBodyAs(String.class); - channel.tryTell(String.format("received %s", body)); + sender.tell(String.format("received %s", body)); } } diff --git a/akka-samples/akka-sample-camel/src/main/scala/sample/camel/Actors.scala b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/Actors.scala index 35654bc264..f4655c3985 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/sample/camel/Actors.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/Actors.scala @@ -12,7 +12,7 @@ class RemoteActor1 extends Actor with Consumer { def endpointUri = "jetty:http://localhost:6644/camel/remote-actor-1" protected def receive = { - case msg: Message ⇒ channel ! Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote1")) + case msg: Message ⇒ sender ! Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote1")) } } @@ -23,7 +23,7 @@ class RemoteActor2 extends Actor with Consumer { def endpointUri = "jetty:http://localhost:6644/camel/remote-actor-2" protected def receive = { - case msg: Message ⇒ channel ! Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote2")) + case msg: Message ⇒ sender ! Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote2")) } } @@ -44,7 +44,7 @@ class Consumer2 extends Actor with Consumer { def endpointUri = "jetty:http://0.0.0.0:8877/camel/default" def receive = { - case msg: Message ⇒ channel ! ("Hello %s" format msg.bodyAs[String]) + case msg: Message ⇒ sender ! ("Hello %s" format msg.bodyAs[String]) } } @@ -62,10 +62,10 @@ class Consumer4 extends Actor with Consumer { def receive = { case msg: Message ⇒ msg.bodyAs[String] match { case "stop" ⇒ { - channel ! "Consumer4 stopped" + sender ! "Consumer4 stopped" self.stop } - case body ⇒ channel ! body + case body ⇒ sender ! body } } } @@ -76,7 +76,7 @@ class Consumer5 extends Actor with Consumer { def receive = { case _ ⇒ { Actor.actorOf[Consumer4] - channel ! "Consumer4 started" + sender ! "Consumer4 started" } } } @@ -106,7 +106,7 @@ class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consu protected def receive = { case msg: Message ⇒ { publisher ! msg.bodyAs[String] - channel ! "message published" + sender ! "message published" } } } @@ -135,8 +135,8 @@ class HttpProducer(transformer: ActorRef) extends Actor with Producer { class HttpTransformer extends Actor { protected def receive = { - case msg: Message ⇒ channel ! (msg.transformBody { body: String ⇒ body replaceAll ("Akka ", "AKKA ") }) - case msg: Failure ⇒ channel ! msg + case msg: Message ⇒ sender ! (msg.transformBody { body: String ⇒ body replaceAll ("Akka ", "AKKA ") }) + case msg: Failure ⇒ sender ! msg } } @@ -150,11 +150,11 @@ class FileConsumer extends Actor with Consumer { case msg: Message ⇒ { if (counter == 2) { println("received %s" format msg.bodyAs[String]) - channel ! Ack + sender ! Ack } else { println("rejected %s" format msg.bodyAs[String]) counter += 1 - channel ! Failure(new Exception("message number %s not accepted" format counter)) + sender ! Failure(new Exception("message number %s not accepted" format counter)) } } } diff --git a/akka-samples/akka-sample-camel/src/test/java/sample/camel/SampleRemoteUntypedConsumer.java b/akka-samples/akka-sample-camel/src/test/java/sample/camel/SampleRemoteUntypedConsumer.java index cc4c5d8c48..4d90518a11 100644 --- a/akka-samples/akka-sample-camel/src/test/java/sample/camel/SampleRemoteUntypedConsumer.java +++ b/akka-samples/akka-sample-camel/src/test/java/sample/camel/SampleRemoteUntypedConsumer.java @@ -15,7 +15,7 @@ public class SampleRemoteUntypedConsumer extends UntypedConsumerActor { Message msg = (Message)message; String body = msg.getBodyAs(String.class); String header = msg.getHeaderAs("test", String.class); - channel.tryTell(String.format("%s %s", body, header)); + sender().tell(String.format("%s %s", body, header)); } } diff --git a/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala index f10f050633..6fca5b42f8 100644 --- a/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala +++ b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala @@ -66,7 +66,7 @@ object HttpConcurrencyTestStress { var correlationIds = Set[Any]() override protected def receive = { - case "getCorrelationIdCount" ⇒ channel ! correlationIds.size + case "getCorrelationIdCount" ⇒ sender ! correlationIds.size case msg ⇒ super.receive(msg) } @@ -93,7 +93,7 @@ object HttpConcurrencyTestStress { class HttpServerWorker extends Actor { protected def receive = { - case msg ⇒ channel ! msg + case msg ⇒ sender ! msg } } } diff --git a/akka-samples/akka-sample-camel/src/test/scala/sample/camel/RemoteConsumerTest.scala b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/RemoteConsumerTest.scala index e5433b1096..aec3a92804 100644 --- a/akka-samples/akka-sample-camel/src/test/scala/sample/camel/RemoteConsumerTest.scala +++ b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/RemoteConsumerTest.scala @@ -94,8 +94,8 @@ object RemoteConsumerTest { def endpointUri = "direct:remote-consumer" protected def receive = { - case "init" ⇒ channel ! "done" - case m: Message ⇒ channel ! ("remote actor: %s" format m.body) + case "init" ⇒ sender ! "done" + case m: Message ⇒ sender ! ("remote actor: %s" format m.body) } } } diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala index 937bb2488c..a54939e789 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala @@ -1,6 +1,6 @@ package sample.fsm.dining.fsm -import akka.actor.{ ActorRef, Actor, FSM, UntypedChannel, NullChannel } +import akka.actor.{ ActorRef, Actor, FSM } import akka.actor.FSM._ import akka.util.Duration import akka.util.duration._ @@ -25,7 +25,7 @@ case object Taken extends ChopstickState /** * Some state container for the chopstick */ -case class TakenBy(hakker: UntypedChannel) +case class TakenBy(hakker: ActorRef) /* * A chopstick is an actor, it can be taken, and put back @@ -33,12 +33,12 @@ case class TakenBy(hakker: UntypedChannel) class Chopstick(name: String) extends Actor with FSM[ChopstickState, TakenBy] { // A chopstick begins its existence as available and taken by no one - startWith(Available, TakenBy(NullChannel)) + startWith(Available, TakenBy(app.deadLetters)) // When a chopstick is available, it can be taken by a some hakker when(Available) { case Event(Take, _) ⇒ - goto(Taken) using TakenBy(channel) replying Taken(self) + goto(Taken) using TakenBy(sender) replying Taken(self) } // When a chopstick is taken by a hakker @@ -47,8 +47,8 @@ class Chopstick(name: String) extends Actor with FSM[ChopstickState, TakenBy] { when(Taken) { case Event(Take, currentState) ⇒ stay replying Busy(self) - case Event(Put, TakenBy(hakker)) if channel == hakker ⇒ - goto(Available) using TakenBy(NullChannel) + case Event(Put, TakenBy(hakker)) if sender == hakker ⇒ + goto(Available) using TakenBy(app.deadLetters) } // Initialze the chopstick diff --git a/akka-stm/src/main/scala/akka/agent/Agent.scala b/akka-stm/src/main/scala/akka/agent/Agent.scala index 175bd87c5c..c2d0e1b485 100644 --- a/akka-stm/src/main/scala/akka/agent/Agent.scala +++ b/akka-stm/src/main/scala/akka/agent/Agent.scala @@ -287,10 +287,9 @@ class AgentUpdater[T](agent: Agent[T]) extends Actor { val txFactory = TransactionFactory(familyName = "AgentUpdater", readonly = false) def receive = { - case update: Update[_] ⇒ - channel.tryTell(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T ⇒ T] }) - case Get ⇒ channel ! agent.get - case _ ⇒ () + case update: Update[_] ⇒ sender.tell(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T ⇒ T] }) + case Get ⇒ sender ! agent.get + case _ ⇒ } } @@ -302,7 +301,7 @@ class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor { def receive = { case update: Update[_] ⇒ try { - channel.tryTell(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T ⇒ T] }) + sender.tell(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T ⇒ T] }) } finally { agent.resume() self.stop() diff --git a/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedCounter.java b/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedCounter.java index 035c195c7e..3d76ff37c2 100644 --- a/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedCounter.java +++ b/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedCounter.java @@ -32,7 +32,7 @@ public class UntypedCoordinatedCounter extends UntypedActor { } else if (incoming instanceof String) { String message = (String) incoming; if (message.equals("GetCount")) { - getChannel().tell(count.get()); + getSender().tell(count.get()); } } } diff --git a/akka-stm/src/test/java/akka/transactor/example/UntypedCounter.java b/akka-stm/src/test/java/akka/transactor/example/UntypedCounter.java index 377e3560da..b580ee88f8 100644 --- a/akka-stm/src/test/java/akka/transactor/example/UntypedCounter.java +++ b/akka-stm/src/test/java/akka/transactor/example/UntypedCounter.java @@ -26,7 +26,7 @@ public class UntypedCounter extends UntypedTransactor { @Override public boolean normally(Object message) { if ("GetCount".equals(message)) { - getChannel().tell(count.get()); + getSender().tell(count.get()); return true; } else return false; } diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedCounter.java b/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedCounter.java index 90636b99f3..2ac3731f06 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedCounter.java +++ b/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedCounter.java @@ -57,7 +57,7 @@ public class UntypedCoordinatedCounter extends UntypedActor { } else if (incoming instanceof String) { String message = (String) incoming; if (message.equals("GetCount")) { - getChannel().tell(count.get()); + getSender().tell(count.get()); } } } diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedCounter.java b/akka-stm/src/test/java/akka/transactor/test/UntypedCounter.java index 4e3f3fde71..d4d53b084c 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedCounter.java +++ b/akka-stm/src/test/java/akka/transactor/test/UntypedCounter.java @@ -70,7 +70,7 @@ public class UntypedCounter extends UntypedTransactor { @Override public boolean normally(Object message) { if ("GetCount".equals(message)) { - getChannel().tell(count.get()); + getSender().tell(count.get()); return true; } else return false; } diff --git a/akka-stm/src/test/scala/transactor/CoordinatedIncrementSpec.scala b/akka-stm/src/test/scala/transactor/CoordinatedIncrementSpec.scala index 284786ef5a..41d6f787a5 100644 --- a/akka-stm/src/test/scala/transactor/CoordinatedIncrementSpec.scala +++ b/akka-stm/src/test/scala/transactor/CoordinatedIncrementSpec.scala @@ -34,7 +34,7 @@ object CoordinatedIncrement { } } - case GetCount ⇒ channel ! count.get + case GetCount ⇒ sender ! count.get } } diff --git a/akka-stm/src/test/scala/transactor/FickleFriendsSpec.scala b/akka-stm/src/test/scala/transactor/FickleFriendsSpec.scala index e4b0eed68e..2d54032413 100644 --- a/akka-stm/src/test/scala/transactor/FickleFriendsSpec.scala +++ b/akka-stm/src/test/scala/transactor/FickleFriendsSpec.scala @@ -56,7 +56,7 @@ object FickleFriends { } } - case GetCount ⇒ channel ! count.get + case GetCount ⇒ sender ! count.get } } @@ -93,7 +93,7 @@ object FickleFriends { } } - case GetCount ⇒ channel ! count.get + case GetCount ⇒ sender ! count.get } } } diff --git a/akka-stm/src/test/scala/transactor/TransactorSpec.scala b/akka-stm/src/test/scala/transactor/TransactorSpec.scala index 7c8dda761a..fef06f91e1 100644 --- a/akka-stm/src/test/scala/transactor/TransactorSpec.scala +++ b/akka-stm/src/test/scala/transactor/TransactorSpec.scala @@ -49,7 +49,7 @@ object TransactorIncrement { } override def normally = { - case GetCount ⇒ channel ! count.get + case GetCount ⇒ sender ! count.get } } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index ea38de78a1..b65af2be6c 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -165,18 +165,12 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling val queue = mbox.queue val execute = mbox.suspendSwitch.fold { queue.push(handle) - if (warnings && handle.channel.isInstanceOf[Promise[_]]) { - app.eventHandler.warning(this, "suspendSwitch, creating Future could deadlock; target: %s" format receiver) - } false } { queue.push(handle) - if (queue.isActive) { - if (warnings && handle.channel.isInstanceOf[Promise[_]]) { - app.eventHandler.warning(this, "blocked on this thread, creating Future could deadlock; target: %s" format receiver) - } + if (queue.isActive) false - } else { + else { queue.enter true } @@ -214,11 +208,6 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling try { if (Mailbox.debug) println(mbox.actor + " processing message " + handle) mbox.actor.invoke(handle) - if (warnings) handle.channel match { - case f: ActorPromise if !f.isCompleted ⇒ - app.eventHandler.warning(this, "calling %s with message %s did not reply as expected, might deadlock" format (mbox.actor, handle.message)) - case _ ⇒ - } true } catch { case ie: InterruptedException ⇒ diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index c7fc73bb05..c3df4e1177 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -20,12 +20,12 @@ object TestActor { trait Message { def msg: AnyRef - def channel: UntypedChannel + def sender: ActorRef } - case class RealMessage(msg: AnyRef, channel: UntypedChannel) extends Message + case class RealMessage(msg: AnyRef, sender: ActorRef) extends Message case object NullMessage extends Message { override def msg: AnyRef = throw new IllegalActorStateException("last receive did not dequeue a message") - override def channel: UntypedChannel = throw new IllegalActorStateException("last receive did not dequeue a message") + override def sender: ActorRef = throw new IllegalActorStateException("last receive did not dequeue a message") } } @@ -44,7 +44,7 @@ class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor with FSM[ case Event(x: AnyRef, data) ⇒ val observe = data map (ignoreFunc ⇒ if (ignoreFunc isDefinedAt x) !ignoreFunc(x) else true) getOrElse true if (observe) - queue.offerLast(RealMessage(x, channel)) + queue.offerLast(RealMessage(x, sender)) stay } @@ -579,13 +579,13 @@ class TestProbe(_application: AkkaApplication) extends TestKit(_application) { * Forward this message as if in the TestActor's receive method with self.forward. */ def forward(actor: ActorRef, msg: AnyRef = lastMessage.msg) { - actor.!(msg)(lastMessage.channel) + actor.!(msg)(lastMessage.sender) } /** - * Get channel of last received message. + * Get sender of last received message. */ - def channel = lastMessage.channel + def sender = lastMessage.sender } diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 3fb594a91e..c5a989c17e 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -8,6 +8,7 @@ import org.scalatest.{ BeforeAndAfterEach, WordSpec } import akka.actor._ import akka.event.EventHandler import akka.dispatch.{ Future, Promise } +import akka.util.duration._ import akka.AkkaApplication /** @@ -36,31 +37,27 @@ object TestActorRefSpec { } class ReplyActor extends TActor { - var replyTo: Channel[Any] = null + var replyTo: ActorRef = null def receiveT = { case "complexRequest" ⇒ { - replyTo = channel + replyTo = sender val worker = TestActorRef(Props[WorkerActor]) worker ! "work" } case "complexRequest2" ⇒ val worker = TestActorRef(Props[WorkerActor]) - worker ! channel + worker ! sender case "workDone" ⇒ replyTo ! "complexReply" - case "simpleRequest" ⇒ channel ! "simpleReply" + case "simpleRequest" ⇒ sender ! "simpleReply" } } class WorkerActor() extends TActor { def receiveT = { - case "work" ⇒ { - channel ! "workDone" - self.stop() - } - case replyTo: UntypedChannel ⇒ { - replyTo ! "complexReply" - } + case "work" ⇒ sender ! "workDone"; self.stop() + case replyTo: Promise[Any] ⇒ replyTo.completeWithResult("complexReply") + case replyTo: ActorRef ⇒ replyTo ! "complexReply" } } @@ -110,7 +107,7 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach { "used with TestActorRef" in { val a = TestActorRef(Props(new Actor { val nested = TestActorRef(Props(self ⇒ { case _ ⇒ })) - def receive = { case _ ⇒ channel ! nested } + def receive = { case _ ⇒ sender ! nested } })) a must not be (null) val nested = (a ? "any").as[ActorRef].get @@ -121,7 +118,7 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach { "used with ActorRef" in { val a = TestActorRef(Props(new Actor { val nested = context.actorOf(Props(self ⇒ { case _ ⇒ })) - def receive = { case _ ⇒ channel ! nested } + def receive = { case _ ⇒ sender ! nested } })) a must not be (null) val nested = (a ? "any").as[ActorRef].get @@ -131,7 +128,7 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach { } - "support reply via channel" in { + "support reply via sender" in { val serverRef = TestActorRef(Props[ReplyActor]) val clientRef = TestActorRef(Props(new SenderActor(serverRef))) @@ -159,8 +156,10 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach { "stop when sent a poison pill" in { filterEvents(EventFilter[ActorKilledException]) { val a = TestActorRef(Props[WorkerActor]) - intercept[ActorKilledException] { - (a ? PoisonPill).get + testActor startsMonitoring a + a.!(PoisonPill)(testActor) + expectMsgPF(5 seconds) { + case Terminated(`a`) ⇒ true } a must be('shutdown) assertThread @@ -224,8 +223,8 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach { "proxy apply for the underlying actor" in { val ref = TestActorRef[WorkerActor] - intercept[IllegalActorStateException] { ref("work") } - val ch = Promise.channel(5000) + ref("work") + val ch = Promise[String](5000) ref ! ch ch must be('completed) ch.get must be("complexReply") diff --git a/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala index 930e5c1454..c2842df0d6 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala @@ -17,7 +17,7 @@ class TestProbeSpec extends AkkaSpec { val tk = TestProbe() val future = tk.ref ? "hello" tk.expectMsg(0 millis, "hello") // TestActor runs on CallingThreadDispatcher - tk.lastMessage.channel ! "world" + tk.lastMessage.sender ! "world" future must be('completed) future.get must equal("world") } @@ -27,7 +27,7 @@ class TestProbeSpec extends AkkaSpec { val tk2 = TestProbe() tk1.ref.!("hello")(tk2.ref) tk1.expectMsg(0 millis, "hello") - tk1.lastMessage.channel ! "world" + tk1.lastMessage.sender ! "world" tk2.expectMsg(0 millis, "world") } @@ -36,7 +36,7 @@ class TestProbeSpec extends AkkaSpec { val probe2 = TestProbe() probe1.send(probe2.ref, "hello") probe2.expectMsg(0 millis, "hello") - probe2.lastMessage.channel ! "world" + probe2.lastMessage.sender ! "world" probe1.expectMsg(0 millis, "world") } diff --git a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java index 1a179052ec..fbb3a2cc14 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java +++ b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java @@ -83,7 +83,7 @@ public class Pi { double result = calculatePiFor(work.getStart(), work.getNrOfElements()); // reply with the result - getChannel().tell(new Result(result)); + getSender().tell(new Result(result)); } else throw new IllegalArgumentException("Unknown message [" + message + "]"); } diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala index 05e05abf20..eff1767bbf 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala @@ -42,8 +42,7 @@ object Pi extends App { } def receive = { - case Work(start, nrOfElements) ⇒ - channel ! Result(calculatePiFor(start, nrOfElements)) // perform the work + case Work(start, nrOfElements) ⇒ sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work } } diff --git a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java b/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java index 55cd4dbe66..b1e3071237 100644 --- a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java +++ b/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java @@ -15,7 +15,6 @@ import akka.routing.LocalConnectionManager; import scala.Option; import akka.actor.ActorRef; import akka.actor.Actors; -import akka.actor.Channel; import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; import akka.dispatch.Future; @@ -80,7 +79,7 @@ public class Pi { public void onReceive(Object message) { if (message instanceof Work) { Work work = (Work) message; - getChannel().tell(new Result(calculatePiFor(work.getArg(), work.getNrOfElements()))); // perform the work + getSender().tell(new Result(calculatePiFor(work.getArg(), work.getNrOfElements()))); // perform the work } else throw new IllegalArgumentException("Unknown message [" + message + "]"); } } @@ -127,11 +126,11 @@ public class Pi { router.tell(new Work(arg, nrOfElements), getSelf()); } // Assume the gathering behavior - become(gather(getChannel())); + become(gather(getSender())); } }; - private Procedure gather(final Channel recipient) { + private Procedure gather(final ActorRef recipient) { return new Procedure() { public void apply(Object msg) { // handle result from the worker @@ -174,7 +173,7 @@ public class Pi { // send calculate message long timeout = 60000; - Future replyFuture = master.ask(new Calculate(), timeout, null); + Future replyFuture = master.ask(new Calculate(), timeout); Option result = replyFuture.await().resultOrException(); if (result.isDefined()) { double pi = (Double) result.get(); diff --git a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala index 67841c7a60..51d98bf7f9 100644 --- a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala @@ -8,9 +8,9 @@ import akka.actor.Actor._ import akka.event.EventHandler import System.{ currentTimeMillis ⇒ now } import akka.routing.Routing.Broadcast -import akka.actor.{ Timeout, Channel, Actor, PoisonPill } import akka.routing._ import akka.AkkaApplication +import akka.actor.{ ActorRef, Timeout, Actor, PoisonPill } object Pi extends App { @@ -40,8 +40,7 @@ object Pi extends App { } def receive = { - case Work(arg, nrOfElements) ⇒ - channel ! Result(calculatePiFor(arg, nrOfElements)) // perform the work + case Work(arg, nrOfElements) ⇒ sender ! Result(calculatePiFor(arg, nrOfElements)) // perform the work } } @@ -67,11 +66,11 @@ object Pi extends App { for (arg ← 0 until nrOfMessages) router ! Work(arg, nrOfElements) //Assume the gathering behavior - this become gather(channel) + this become gather(sender) } // phase 2, aggregate the results of the Calculation - def gather(recipient: Channel[Any]): Receive = { + def gather(recipient: ActorRef): Receive = { case Result(value) ⇒ // handle result from the worker pi += value