diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaAPITestActor.java b/akka-actor-tests/src/test/java/akka/actor/JavaAPITestActor.java
index 588307ee6c..4952d1b2c9 100644
--- a/akka-actor-tests/src/test/java/akka/actor/JavaAPITestActor.java
+++ b/akka-actor-tests/src/test/java/akka/actor/JavaAPITestActor.java
@@ -2,6 +2,6 @@ package akka.actor;
public class JavaAPITestActor extends UntypedActor {
public void onReceive(Object msg) {
- tryReply("got it!");
+ getChannel().tryTell("got it!");
}
}
diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala
index 895cc31c79..b53f2cd998 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala
@@ -15,7 +15,7 @@ object ActorFireForgetRequestReplySpec {
class ReplyActor extends Actor {
def receive = {
case "Send" ⇒
- reply("Reply")
+ channel ! "Reply"
case "SendImplicit" ⇒
channel ! "ReplyImplicit"
}
diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala
index 12456fd3ff..5c9dd2ec2f 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala
@@ -25,7 +25,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
val currentGen = generationProvider.getAndIncrement()
override def preStart() { report("preStart") }
override def postStop() { report("postStop") }
- def receive = { case "status" ⇒ this reply message("OK") }
+ def receive = { case "status" ⇒ channel ! message("OK") }
}
"An Actor" must {
diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala
index 747f776f9b..0e727f6e7a 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala
@@ -35,7 +35,7 @@ object ActorRefSpec {
val worker = context.actorOf(Props[WorkerActor])
worker ! ReplyTo(channel)
case "workDone" ⇒ replyTo ! "complexReply"
- case "simpleRequest" ⇒ reply("simpleReply")
+ case "simpleRequest" ⇒ channel ! "simpleReply"
}
}
@@ -43,7 +43,7 @@ object ActorRefSpec {
def receive = {
case "work" ⇒ {
work
- reply("workDone")
+ channel ! "workDone"
self.stop()
}
case ReplyTo(replyTo) ⇒ {
@@ -74,7 +74,7 @@ object ActorRefSpec {
class OuterActor(val inner: ActorRef) extends Actor {
def receive = {
- case "self" ⇒ reply(self)
+ case "self" ⇒ channel ! self
case x ⇒ inner forward x
}
}
@@ -83,7 +83,7 @@ object ActorRefSpec {
val fail = new InnerActor
def receive = {
- case "self" ⇒ reply(self)
+ case "self" ⇒ channel ! self
case x ⇒ inner forward x
}
}
@@ -94,8 +94,8 @@ object ActorRefSpec {
class InnerActor extends Actor {
def receive = {
- case "innerself" ⇒ reply(self)
- case other ⇒ reply(other)
+ case "innerself" ⇒ channel ! self
+ case other ⇒ channel ! other
}
}
@@ -103,8 +103,8 @@ object ActorRefSpec {
val fail = new InnerActor
def receive = {
- case "innerself" ⇒ reply(self)
- case other ⇒ reply(other)
+ case "innerself" ⇒ channel ! self
+ case other ⇒ channel ! other
}
}
@@ -321,7 +321,7 @@ class ActorRefSpec extends AkkaSpec {
"support nested actorOfs" in {
val a = actorOf(new Actor {
val nested = actorOf(new Actor { def receive = { case _ ⇒ } })
- def receive = { case _ ⇒ reply(nested) }
+ def receive = { case _ ⇒ channel ! nested }
})
val nested = (a ? "any").as[ActorRef].get
@@ -369,8 +369,8 @@ class ActorRefSpec extends AkkaSpec {
val timeout = Timeout(20000)
val ref = actorOf(Props(new Actor {
def receive = {
- case 5 ⇒ tryReply("five")
- case null ⇒ tryReply("null")
+ case 5 ⇒ channel.tryTell("five")
+ case null ⇒ channel.tryTell("null")
}
}))
diff --git a/akka-actor-tests/src/test/scala/akka/actor/Bench.scala b/akka-actor-tests/src/test/scala/akka/actor/Bench.scala
index 9aa5ffc05d..492a3e6680 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/Bench.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/Bench.scala
@@ -44,7 +44,7 @@ object Chameneos {
case Exit ⇒
colour = FADED
- sender.get ! MeetingCount(meetings)
+ sender ! MeetingCount(meetings)
}
def complement(otherColour: Colour): Colour = colour match {
@@ -97,7 +97,7 @@ object Chameneos {
n -= 1
chameneo ! msg
waitingChameneo = None
- case None ⇒ waitingChameneo = sender
+ case None ⇒ waitingChameneo = Some(sender)
}
} else {
waitingChameneo.foreach(_ ! Exit)
diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala
index 897d3a6180..0a775a3684 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala
@@ -12,59 +12,70 @@ import java.util.concurrent.atomic._
class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender {
"The Death Watch" must {
- def expectTerminationOf(actorRef: ActorRef) = expectMsgPF(2 seconds, actorRef + ": Stopped") {
- case Terminated(`actorRef`, ex: ActorKilledException) if ex.getMessage == "Stopped" ⇒ true
+ def expectTerminationOf(actorRef: ActorRef) = expectMsgPF(5 seconds, actorRef + ": Stopped or Already terminated when linking") {
+ case Terminated(`actorRef`) ⇒ true
}
"notify with one Terminated message when an Actor is stopped" in {
- val terminal = actorOf(Props(context ⇒ { case _ ⇒ context.self.stop() }))
+ val terminal = actorOf(Props(context ⇒ { case _ ⇒ }))
testActor startsMonitoring terminal
- terminal ! "anything"
+ testActor ! "ping"
+ expectMsg("ping")
+
+ terminal ! PoisonPill
expectTerminationOf(terminal)
-
- terminal.stop()
}
"notify with all monitors with one Terminated message when an Actor is stopped" in {
- val monitor1, monitor2 = actorOf(Props(context ⇒ { case t: Terminated ⇒ testActor ! t }))
- val terminal = actorOf(Props(context ⇒ { case _ ⇒ context.self.stop() }))
+ val terminal = actorOf(Props(context ⇒ { case _ ⇒ }))
+ val monitor1, monitor2, monitor3 =
+ actorOf(Props(new Actor {
+ watch(terminal)
+ def receive = { case t: Terminated ⇒ testActor ! t }
+ }))
- monitor1 startsMonitoring terminal
- monitor2 startsMonitoring terminal
- testActor startsMonitoring terminal
-
- terminal ! "anything"
+ terminal ! PoisonPill
expectTerminationOf(terminal)
expectTerminationOf(terminal)
expectTerminationOf(terminal)
- terminal.stop()
monitor1.stop()
monitor2.stop()
+ monitor3.stop()
}
"notify with _current_ monitors with one Terminated message when an Actor is stopped" in {
- val monitor1, monitor2 = actorOf(Props(context ⇒ { case t: Terminated ⇒ testActor ! t }))
- val terminal = actorOf(Props(context ⇒ { case _ ⇒ context.self.stop() }))
+ val terminal = actorOf(Props(context ⇒ { case _ ⇒ }))
+ val monitor1, monitor3 =
+ actorOf(Props(new Actor {
+ watch(terminal)
+ def receive = { case t: Terminated ⇒ testActor ! t }
+ }))
+ val monitor2 = actorOf(Props(new Actor {
+ watch(terminal)
+ unwatch(terminal)
+ def receive = {
+ case "ping" ⇒ sender ! "pong"
+ case t: Terminated ⇒ testActor ! t
+ }
+ }))
- monitor1 startsMonitoring terminal
- monitor2 startsMonitoring terminal
- testActor startsMonitoring terminal
+ monitor2 ! "ping"
- monitor2 stopsMonitoring terminal
+ expectMsg("pong") //Needs to be here since startsMonitoring and stopsMonitoring are asynchronous
- terminal ! "anything"
+ terminal ! PoisonPill
expectTerminationOf(terminal)
expectTerminationOf(terminal)
- terminal.stop()
monitor1.stop()
monitor2.stop()
+ monitor3.stop()
}
"notify with a Terminated message once when an Actor is stopped but not when restarted" in {
@@ -73,7 +84,10 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
val terminalProps = Props(context ⇒ { case x ⇒ context.channel ! x })
val terminal = (supervisor ? terminalProps).as[ActorRef].get
- testActor startsMonitoring terminal
+ val monitor = actorOf(Props(new Actor {
+ watch(terminal)
+ def receive = { case t: Terminated ⇒ testActor ! t }
+ }))
terminal ! Kill
terminal ! Kill
@@ -81,24 +95,34 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
terminal ! Kill
expectTerminationOf(terminal)
+ terminal.isShutdown must be === true
- terminal.stop()
supervisor.stop()
}
}
"fail a monitor which does not handle Terminated()" in {
filterEvents(EventFilter[ActorKilledException], EventFilter[DeathPactException]) {
- val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(0))))
+ val supervisor = actorOf(Props[Supervisor]
+ .withFaultHandler(new OneForOneStrategy(FaultHandlingStrategy.makeDecider(List(classOf[Exception])), Some(0)) {
+ override def handleFailure(fail: Failed, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]) = {
+ testActor ! fail
+ super.handleFailure(fail, stats, children)
+ }
+ }))
val failed, brother = (supervisor ? Props.empty).as[ActorRef].get
brother startsMonitoring failed
testActor startsMonitoring brother
failed ! Kill
- expectMsgPF() {
- case Terminated(brother, DeathPactException(failed, _: ActorKilledException)) ⇒ true
+ val result = receiveWhile(3 seconds, messages = 3) {
+ case Failed(`failed`, _: ActorKilledException) ⇒ 1
+ case Failed(`brother`, DeathPactException(`failed`)) ⇒ 2
+ case Terminated(`brother`) ⇒ 3
}
+ testActor must not be 'shutdown
+ result must be(Seq(1, 2, 3))
}
}
}
diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala
index 6a98ac5448..c72c095ce2 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala
@@ -55,8 +55,12 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender {
"not fail when listener goes away" in {
val forward = actorOf(new Forwarder(testActor))
- val sup = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None)))
- val fsm = sup startsMonitoring actorOf(new MyFSM(testActor))
+ val fsm = actorOf(new MyFSM(testActor))
+ val sup = actorOf(Props(new Actor {
+ self startsMonitoring fsm
+ def receive = { case _ ⇒ }
+ }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None)))
+
within(300 millis) {
fsm ! SubscribeTransitionCallBack(forward)
expectMsg(CurrentState(fsm, 0))
diff --git a/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala
index ce8bacf12b..55cfbd5fd7 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala
@@ -15,7 +15,7 @@ object ForwardActorSpec {
def createForwardingChain(app: AkkaApplication): ActorRef = {
val replier = app.actorOf(new Actor {
- def receive = { case x ⇒ reply(x) }
+ def receive = { case x ⇒ channel ! x }
})
def mkforwarder(forwardTo: ActorRef) = app.actorOf(
diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala
index 78ca017679..d61500c2b4 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala
@@ -48,7 +48,7 @@ object IOActorSpec {
def receiveIO = {
case length: Int ⇒
val bytes = socket.read(length)
- reply(bytes)
+ channel ! bytes
}
}
}
@@ -108,9 +108,9 @@ object IOActorSpec {
case msg: NewClient ⇒ createWorker forward msg
case ('set, key: String, value: ByteString) ⇒
kvs += (key -> value)
- tryReply(())
- case ('get, key: String) ⇒ tryReply(kvs.get(key))
- case 'getall ⇒ tryReply(kvs)
+ channel.tryTell(())(self)
+ case ('get, key: String) ⇒ channel.tryTell(kvs.get(key))(self)
+ case 'getall ⇒ channel.tryTell(kvs)(self)
}
}
@@ -123,18 +123,20 @@ object IOActorSpec {
socket = connect(ioManager, host, port)
}
+ def reply(msg: Any) = channel.tryTell(msg)(self)
+
def receiveIO = {
case ('set, key: String, value: ByteString) ⇒
socket write (ByteString("SET " + key + " " + value.length + "\r\n") ++ value)
- tryReply(readResult)
+ reply(readResult)
case ('get, key: String) ⇒
socket write ByteString("GET " + key + "\r\n")
- tryReply(readResult)
+ reply(readResult)
case 'getall ⇒
socket write ByteString("GETALL\r\n")
- tryReply(readResult)
+ reply(readResult)
}
def readResult = {
diff --git a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala
index e767ddeeb7..fda34295bb 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala
@@ -62,7 +62,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
app.eventHandler.addListener(testActor)
val actor = TestActorRef(new Actor {
def receive = loggable(this) {
- case _ ⇒ reply("x")
+ case _ ⇒ channel ! "x"
}
})
actor ! "buh"
@@ -91,7 +91,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
app.eventHandler.addListener(testActor)
val actor = TestActorRef(new Actor {
def receive = loggable(this)(loggable(this) {
- case _ ⇒ reply("x")
+ case _ ⇒ channel ! "x"
})
})
actor ! "buh"
diff --git a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala
index 159d611c28..af2f84808b 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala
@@ -209,7 +209,7 @@ class RestartStrategySpec extends AkkaSpec {
val boss = actorOf(Props(new Actor {
def receive = {
- case p: Props ⇒ reply(context.actorOf(p))
+ case p: Props ⇒ channel ! context.actorOf(p)
case t: Terminated ⇒ maxNoOfRestartsLatch.open
}
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, Some(1000))))
diff --git a/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala b/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala
index 023490da31..14c70933f7 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala
@@ -5,6 +5,6 @@ package akka.actor
class Supervisor extends Actor {
def receive = {
- case x: Props ⇒ reply(context.actorOf(x))
+ case x: Props ⇒ channel ! context.actorOf(x)
}
}
diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala
index cc16c2da08..43bbfc243f 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala
@@ -13,7 +13,7 @@ object SupervisorHierarchySpec {
class CountDownActor(countDown: CountDownLatch) extends Actor {
protected def receive = {
- case p: Props ⇒ reply(context.actorOf(p))
+ case p: Props ⇒ channel ! context.actorOf(p)
}
override def postRestart(reason: Throwable) = {
countDown.countDown()
@@ -55,8 +55,8 @@ class SupervisorHierarchySpec extends AkkaSpec {
self startsMonitoring crasher
protected def receive = {
- case "killCrasher" ⇒ crasher ! Kill
- case Terminated(_, _) ⇒ countDownMax.countDown()
+ case "killCrasher" ⇒ crasher ! Kill
+ case Terminated(_) ⇒ countDownMax.countDown()
}
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 1, 5000)))
diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala
index cab176d184..97c5461741 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala
@@ -21,7 +21,7 @@ class SupervisorMiscSpec extends AkkaSpec {
val workerProps = Props(new Actor {
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
protected def receive = {
- case "status" ⇒ this.reply("OK")
+ case "status" ⇒ this.channel ! "OK"
case _ ⇒ this.self.stop()
}
})
diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala
index 7cf0220f26..20b1f92aea 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala
@@ -4,16 +4,13 @@
package akka.actor
-import org.scalatest.WordSpec
-import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterEach
-import org.scalatest.BeforeAndAfterAll
import akka.testkit.Testing.sleepFor
import akka.util.duration._
import akka.{ Die, Ping }
import akka.actor.Actor._
import akka.testkit.TestEvent._
-import akka.testkit.EventFilter
+import akka.testkit.{ EventFilter, ImplicitSender }
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.LinkedBlockingQueue
import akka.testkit.AkkaSpec
@@ -42,7 +39,7 @@ object SupervisorSpec {
def receive = {
case Ping ⇒
messageLog.put(PingMessage)
- tryReply(PongMessage)
+ channel.tryTell(PongMessage)
case Die ⇒
throw new RuntimeException(ExceptionMessage)
}
@@ -59,14 +56,14 @@ object SupervisorSpec {
var s: UntypedChannel = _
def receive = {
- case Die ⇒ temp ! Die; s = context.channel
- case Terminated(`temp`, cause) ⇒ s ! cause
+ case Die ⇒ temp ! Die; s = context.channel
+ case Terminated(`temp`) ⇒ s ! "terminated"
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
-class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach {
+class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender {
import SupervisorSpec._
@@ -150,9 +147,8 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach {
"not restart programmatically linked temporary actor" in {
val master = actorOf(Props[Master].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(0))))
- (master.?(Die, TimeoutMillis)).get match {
- case r: RuntimeException ⇒ r === ExceptionMessage
- }
+ master ! Die
+ expectMsg(3 seconds, "terminated")
sleepFor(1 second)
messageLogPoll must be(null)
@@ -298,7 +294,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach {
if (inits.get % 2 == 0) throw new IllegalStateException("Don't wanna!")
def receive = {
- case Ping ⇒ tryReply(PongMessage)
+ case Ping ⇒ channel.tryTell(PongMessage)
case Die ⇒ throw new RuntimeException("Expected")
}
})
diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala
index 897ddc06b0..71392cdf92 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala
@@ -22,7 +22,7 @@ class SupervisorTreeSpec extends AkkaSpec with ImplicitSender {
within(5 seconds) {
val p = Props(new Actor {
def receive = {
- case p: Props ⇒ this reply context.actorOf(p)
+ case p: Props ⇒ channel ! context.actorOf(p)
}
override def preRestart(cause: Throwable, msg: Option[Any]) { testActor ! self.address }
}).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, 1000))
diff --git a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala
index b1f56963b5..57f028ce07 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala
@@ -50,11 +50,11 @@ object Ticket669Spec {
}
override def preRestart(reason: scala.Throwable, msg: Option[Any]) {
- tryReply("failure1")
+ channel.tryTell("failure1")
}
override def postStop() {
- tryReply("failure2")
+ channel.tryTell("failure2")
}
}
}
diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala
index c9efc0a0fd..cd524318c6 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala
@@ -71,8 +71,8 @@ object ActorModelSpec {
case Meet(sign, wait) ⇒ ack; sign.countDown(); wait.await(); busy.switchOff()
case Wait(time) ⇒ ack; Thread.sleep(time); busy.switchOff()
case WaitAck(time, l) ⇒ ack; Thread.sleep(time); l.countDown(); busy.switchOff()
- case Reply(msg) ⇒ ack; reply(msg); busy.switchOff()
- case TryReply(msg) ⇒ ack; tryReply(msg); busy.switchOff()
+ case Reply(msg) ⇒ ack; channel ! msg; busy.switchOff()
+ case TryReply(msg) ⇒ ack; channel.tryTell(msg); busy.switchOff()
case Forward(to, msg) ⇒ ack; to.forward(msg); busy.switchOff()
case CountDown(latch) ⇒ ack; latch.countDown(); busy.switchOff()
case Increment(count) ⇒ ack; count.incrementAndGet(); busy.switchOff()
@@ -128,10 +128,10 @@ object ActorModelSpec {
super.unregister(actor)
}
- protected[akka] abstract override def dispatch(invocation: Envelope) {
- val stats = getStats(invocation.receiver.self)
+ protected[akka] abstract override def dispatch(receiver: ActorCell, invocation: Envelope) {
+ val stats = getStats(receiver.self)
stats.msgsReceived.incrementAndGet()
- super.dispatch(invocation)
+ super.dispatch(receiver, invocation)
}
protected[akka] abstract override def start() {
@@ -381,31 +381,11 @@ abstract class ActorModelSpec extends AkkaSpec {
} catch {
case e ⇒
System.err.println("Error: " + e.getMessage + " missing count downs == " + cachedMessage.latch.getCount() + " out of " + num)
- //app.eventHandler.error(new Exception with NoStackTrace, null, cachedMessage.latch.getCount())
}
}
for (run ← 1 to 3) {
flood(40000)
- try {
- assertDispatcher(dispatcher)(starts = run, stops = run)
- } catch {
- case e ⇒
-
- // FIXME: registry has been removed
- // app.registry.local.foreach {
- // case actor: LocalActorRef ⇒
- // val cell = actor.underlying
- // val mbox = cell.mailbox
- // System.err.println("Left in the registry: " + actor.address + " => " + cell + " => " + mbox.hasMessages + " " + mbox.hasSystemMessages + " " + mbox.numberOfMessages + " " + mbox.isScheduled)
- // var message = mbox.dequeue()
- // while (message ne null) {
- // System.err.println("Lingering message for " + cell + " " + message)
- // message = mbox.dequeue()
- // }
- // }
-
- throw e
- }
+ assertDispatcher(dispatcher)(starts = run, stops = run)
}
}
diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala
index d677e23fc1..bb5be1ca27 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala
@@ -9,7 +9,7 @@ import akka.actor.{ Props, Actor }
object DispatcherActorSpec {
class TestActor extends Actor {
def receive = {
- case "Hello" ⇒ reply("World")
+ case "Hello" ⇒ channel ! "World"
case "Failure" ⇒ throw new RuntimeException("Expected exception; to test fault-tolerance")
}
}
@@ -67,20 +67,18 @@ class DispatcherActorSpec extends AkkaSpec {
val works = new AtomicBoolean(true)
val latch = new CountDownLatch(100)
- val thereWeAre = new CountDownLatch(1)
val start = new CountDownLatch(1)
val fastOne = actorOf(
Props(context ⇒ { case "sabotage" ⇒ works.set(false) }).withDispatcher(throughputDispatcher))
val slowOne = actorOf(
Props(context ⇒ {
- case "hogexecutor" ⇒ thereWeAre.countDown(); start.await
+ case "hogexecutor" ⇒ context.channel ! "OK"; start.await
case "ping" ⇒ if (works.get) latch.countDown()
}).withDispatcher(throughputDispatcher))
- slowOne ! "hogexecutor"
+ assert((slowOne ? "hogexecutor").get === "OK")
(1 to 100) foreach { _ ⇒ slowOne ! "ping" }
- assert(thereWeAre.await(2, TimeUnit.SECONDS))
fastOne ! "sabotage"
start.countDown()
latch.await(10, TimeUnit.SECONDS)
diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala
index 46d05c18ea..3564ab3dcb 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala
@@ -12,7 +12,7 @@ import org.scalatest.BeforeAndAfterEach
object PinnedActorSpec {
class TestActor extends Actor {
def receive = {
- case "Hello" ⇒ reply("World")
+ case "Hello" ⇒ channel ! "World"
case "Failure" ⇒ throw new RuntimeException("Expected exception; to test fault-tolerance")
}
}
diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala
index aef33a4874..6d5f8b9ea1 100644
--- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala
@@ -17,20 +17,16 @@ import org.scalatest.junit.JUnitSuite
object FutureSpec {
class TestActor extends Actor {
def receive = {
- case "Hello" ⇒
- reply("World")
- case "NoReply" ⇒ {}
- case "Failure" ⇒
- throw new RuntimeException("Expected exception; to test fault-tolerance")
+ case "Hello" ⇒ channel ! "World"
+ case "Failure" ⇒ throw new RuntimeException("Expected exception; to test fault-tolerance")
+ case "NoReply" ⇒
}
}
class TestDelayActor(await: StandardLatch) extends Actor {
def receive = {
- case "Hello" ⇒
- await.await
- reply("World")
- case "NoReply" ⇒ { await.await }
+ case "Hello" ⇒ await.await; channel ! "World"
+ case "NoReply" ⇒ await.await
case "Failure" ⇒
await.await
throw new RuntimeException("Expected exception; to test fault-tolerance")
@@ -140,7 +136,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
"will return a result" must {
behave like futureWithResult { test ⇒
val actor1 = actorOf[TestActor]
- val actor2 = actorOf(new Actor { def receive = { case s: String ⇒ reply(s.toUpperCase) } })
+ val actor2 = actorOf(new Actor { def receive = { case s: String ⇒ channel ! s.toUpperCase } })
val future = actor1 ? "Hello" flatMap { case s: String ⇒ actor2 ? s }
future.await
test(future, "WORLD")
@@ -152,7 +148,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
behave like futureWithException[ArithmeticException] { test ⇒
filterException[ArithmeticException] {
val actor1 = actorOf[TestActor]
- val actor2 = actorOf(new Actor { def receive = { case s: String ⇒ reply(s.length / 0) } })
+ val actor2 = actorOf(new Actor { def receive = { case s: String ⇒ channel ! s.length / 0 } })
val future = actor1 ? "Hello" flatMap { case s: String ⇒ actor2 ? s }
future.await
test(future, "/ by zero")
@@ -165,7 +161,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
behave like futureWithException[MatchError] { test ⇒
filterException[MatchError] {
val actor1 = actorOf[TestActor]
- val actor2 = actorOf(new Actor { def receive = { case s: String ⇒ reply(s.toUpperCase) } })
+ val actor2 = actorOf(new Actor { def receive = { case s: String ⇒ channel ! s.toUpperCase } })
val future = actor1 ? "Hello" flatMap { case i: Int ⇒ actor2 ? i }
future.await
test(future, "World (of class java.lang.String)")
@@ -182,8 +178,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
filterException[ClassCastException] {
val actor = actorOf(new Actor {
def receive = {
- case s: String ⇒ reply(s.length)
- case i: Int ⇒ reply((i * 2).toString)
+ case s: String ⇒ channel ! s.length
+ case i: Int ⇒ channel ! (i * 2).toString
}
})
@@ -214,8 +210,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
case class Res[T](res: T)
val actor = actorOf(new Actor {
def receive = {
- case Req(s: String) ⇒ reply(Res(s.length))
- case Req(i: Int) ⇒ reply(Res((i * 2).toString))
+ case Req(s: String) ⇒ channel ! Res(s.length)
+ case Req(i: Int) ⇒ channel ! Res((i * 2).toString)
}
})
@@ -301,7 +297,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
"fold" in {
val actors = (1 to 10).toList map { _ ⇒
actorOf(new Actor {
- def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); tryReply(add) }
+ def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); channel.tryTell(add) }
})
}
val timeout = 10000
@@ -312,7 +308,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
"fold by composing" in {
val actors = (1 to 10).toList map { _ ⇒
actorOf(new Actor {
- def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); tryReply(add) }
+ def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); channel.tryTell(add) }
})
}
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), 10000).mapTo[Int] }
@@ -327,7 +323,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
case (add: Int, wait: Int) ⇒
Thread.sleep(wait)
if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
- tryReply(add)
+ channel.tryTell(add)
}
})
}
@@ -359,7 +355,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
"shouldReduceResults" in {
val actors = (1 to 10).toList map { _ ⇒
actorOf(new Actor {
- def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); tryReply(add) }
+ def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); channel.tryTell(add) }
})
}
val timeout = 10000
@@ -375,7 +371,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
case (add: Int, wait: Int) ⇒
Thread.sleep(wait)
if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
- tryReply(add)
+ channel.tryTell(add)
}
})
}
@@ -404,7 +400,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
var counter = 1
def receive = {
case 'GetNext ⇒
- reply(counter)
+ channel ! counter
counter += 2
}
})
diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala
index b51896f659..b4cc86cca2 100644
--- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala
@@ -80,12 +80,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
result
}
- def createMessageInvocation(msg: Any): Envelope = {
- new Envelope(
- actorOf(new Actor { //Dummy actor
- def receive = { case _ ⇒ }
- }).asInstanceOf[LocalActorRef].underlying, msg, NullChannel)
- }
+ def createMessageInvocation(msg: Any): Envelope = Envelope(msg, NullChannel)
def ensureInitialMailboxState(config: MailboxType, q: Mailbox) {
q must not be null
diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala
index a54f739e23..e3ab6d2ed7 100644
--- a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala
@@ -29,7 +29,7 @@ class PriorityDispatcherSpec extends AkkaSpec {
def receive = {
case i: Int ⇒ acc = i :: acc
- case 'Result ⇒ tryReply(acc)
+ case 'Result ⇒ channel.tryTell(acc)
}
}).withDispatcher(dispatcher)).asInstanceOf[LocalActorRef]
diff --git a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala
index 15b23dec0f..cf88186157 100644
--- a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala
@@ -41,7 +41,7 @@ class ActorPoolSpec extends AkkaSpec {
case _ ⇒
count.incrementAndGet
latch.countDown()
- tryReply("success")
+ channel.tryTell("success")
}
}))
@@ -88,7 +88,7 @@ class ActorPoolSpec extends AkkaSpec {
def receive = {
case req: String ⇒ {
sleepFor(10 millis)
- tryReply("Response")
+ channel.tryTell("Response")
}
}
}))
diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala
index 5e5da7ee23..ca566f726a 100644
--- a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala
@@ -73,7 +73,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec {
val actor = app.actorOf(Props(new Actor {
lazy val id = counter.getAndIncrement()
def receive = {
- case "hit" ⇒ reply(id)
+ case "hit" ⇒ channel ! id
case "end" ⇒ doneLatch.countDown()
}
}), address)
@@ -187,7 +187,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec {
val actor = app.actorOf(Props(new Actor {
lazy val id = counter.getAndIncrement()
def receive = {
- case "hit" ⇒ reply(id)
+ case "hit" ⇒ channel ! id
case "end" ⇒ doneLatch.countDown()
}
}), address)
diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala
index 5937597149..dab51d076f 100644
--- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala
@@ -458,7 +458,7 @@ class RoutingSpec extends AkkaSpec {
case Stop(None) ⇒ self.stop()
case Stop(Some(_id)) if (_id == id) ⇒ self.stop()
case _id: Int if (_id == id) ⇒
- case _ ⇒ Thread sleep 100 * id; tryReply(id)
+ case _ ⇒ Thread sleep 100 * id; channel.tryTell(id)
}
override def postStop = {
diff --git a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala
index 0367956c50..5e46f3ec05 100644
--- a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala
+++ b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala
@@ -21,7 +21,7 @@ class Ticket703Spec extends AkkaSpec {
def receive = {
case req: String ⇒
Thread.sleep(6000L)
- tryReply("Response")
+ channel.tryTell("Response")
}
}))
}).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 5, 1000)))
diff --git a/akka-actor/src/main/scala/akka/AkkaApplication.scala b/akka-actor/src/main/scala/akka/AkkaApplication.scala
index cedfc56c32..3fcc09d7cf 100644
--- a/akka-actor/src/main/scala/akka/AkkaApplication.scala
+++ b/akka-actor/src/main/scala/akka/AkkaApplication.scala
@@ -5,6 +5,7 @@ package akka
import akka.config._
import akka.actor._
+import event._
import java.net.InetAddress
import com.eaio.uuid.UUID
import akka.dispatch.{ Dispatcher, Dispatchers, Future, DefaultPromise }
@@ -16,9 +17,6 @@ import akka.dispatch.UnboundedMailbox
import akka.routing.Routing
import akka.remote.RemoteSupport
import akka.serialization.Serialization
-import akka.event.EventHandler
-import akka.event.EventHandlerLogging
-import akka.event.Logging
import java.net.InetSocketAddress
object AkkaApplication {
@@ -119,6 +117,9 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor
val RemoteTransport = getString("akka.remote.layer", "akka.remote.netty.NettyRemoteSupport")
val RemoteServerPort = getInt("akka.remote.server.port", 2552)
+
+ val FailureDetectorThreshold: Int = getInt("akka.remote.failure-detector.threshold", 8)
+ val FailureDetectorMaxSampleSize: Int = getInt("akka.remote.failure-detector.max-sample-size", 1000)
}
object MistSettings {
@@ -193,6 +194,9 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor
val log: Logging = new EventHandlerLogging(eventHandler, this)
// TODO think about memory consistency effects when doing funky stuff inside constructor
+ val deadLetters = new DeadLetterActorRef(this)
+
+ // TODO think about memory consistency effects when doing funky stuff inside an ActorRefProvider's constructor
val deployer = new Deployer(this)
val deathWatch = provider.createDeathWatch()
diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala
index 4413a05e29..d55c330840 100644
--- a/akka-actor/src/main/scala/akka/actor/Actor.scala
+++ b/akka-actor/src/main/scala/akka/actor/Actor.scala
@@ -55,7 +55,7 @@ case class HotSwap(code: ActorRef ⇒ Actor.Receive, discardOld: Boolean = true)
case class Failed(@BeanProperty actor: ActorRef,
@BeanProperty cause: Throwable) extends AutoReceivedMessage with PossiblyHarmful
-case class ChildTerminated(@BeanProperty child: ActorRef, @BeanProperty cause: Throwable) extends AutoReceivedMessage with PossiblyHarmful
+case class ChildTerminated(@BeanProperty child: ActorRef) extends AutoReceivedMessage with PossiblyHarmful
case object RevertHotSwap extends AutoReceivedMessage with PossiblyHarmful
@@ -63,7 +63,7 @@ case object PoisonPill extends AutoReceivedMessage with PossiblyHarmful
case object Kill extends AutoReceivedMessage with PossiblyHarmful
-case class Terminated(@BeanProperty actor: ActorRef, @BeanProperty cause: Throwable) extends PossiblyHarmful
+case class Terminated(@BeanProperty actor: ActorRef) extends PossiblyHarmful
case object ReceiveTimeout extends PossiblyHarmful
@@ -95,8 +95,8 @@ class InvalidMessageException private[akka] (message: String, cause: Throwable =
def this(msg: String) = this(msg, null);
}
-case class DeathPactException private[akka] (dead: ActorRef, cause: Throwable)
- extends AkkaException("monitored actor " + dead + " terminated", cause)
+case class DeathPactException private[akka] (dead: ActorRef)
+ extends AkkaException("monitored actor " + dead + " terminated")
with NoStackTrace
// must not pass InterruptedException to other threads
@@ -175,6 +175,11 @@ object Actor {
case _ ⇒ new LoggingReceive(source, r)
}
}
+
+ object emptyBehavior extends Receive {
+ def isDefinedAt(x: Any) = false
+ def apply(x: Any) = throw new UnsupportedOperationException("empty behavior apply()")
+ }
}
/**
@@ -223,12 +228,10 @@ trait Actor {
implicit def app = context.app
- private def config = context.app.AkkaConfig
-
/**
* The default timeout, based on the config setting 'akka.actor.timeout'
*/
- implicit def defaultTimeout = config.ActorTimeout
+ implicit def defaultTimeout = app.AkkaConfig.ActorTimeout
/**
* Wrap a Receive partial function in a logging enclosure, which sends a
@@ -244,7 +247,7 @@ trait Actor {
* This method does NOT modify the given Receive unless
* akka.actor.debug.receive is set within akka.conf.
*/
- def loggable(self: AnyRef)(r: Receive): Receive = if (config.AddLoggingReceive) LoggingReceive(self, r) else r
+ def loggable(self: AnyRef)(r: Receive): Receive = if (app.AkkaConfig.AddLoggingReceive) LoggingReceive(self, r) else r //TODO FIXME Shouldn't this be in a Loggable-trait?
/**
* Some[ActorRef] representation of the 'self' ActorRef reference.
@@ -252,7 +255,7 @@ trait Actor {
* Mainly for internal use, functions as the implicit sender references when invoking
* the 'forward' function.
*/
- def someSelf: Some[ActorRef with ScalaActorRef] = Some(context.self)
+ def someSelf: Some[ActorRef with ScalaActorRef] = Some(context.self) //TODO FIXME we might not need this when we switch to sender-in-scope-always
/*
* Option[ActorRef] representation of the 'self' ActorRef reference.
@@ -260,7 +263,7 @@ trait Actor {
* Mainly for internal use, functions as the implicit sender references when invoking
* one of the message send functions ('!' and '?').
*/
- def optionSelf: Option[ActorRef with ScalaActorRef] = someSelf
+ def optionSelf: Option[ActorRef with ScalaActorRef] = someSelf //TODO FIXME we might not need this when we switch to sender-in-scope-always
/**
* The 'self' field holds the ActorRef for this actor.
@@ -276,20 +279,15 @@ trait Actor {
* The reference sender Actor of the last received message.
* Is defined if the message was sent from another Actor, else None.
*/
- def sender: Option[ActorRef] = context.sender
-
- /**
- * The reference sender future of the last received message.
- * Is defined if the message was sent with sent with '?'/'ask', else None.
- */
- def senderFuture(): Option[Promise[Any]] = context.senderFuture
+ @inline
+ final def sender: ActorRef = context.sender
/**
* Abstraction for unification of sender and senderFuture for later reply
*/
def channel: UntypedChannel = context.channel
- // just for current compatibility
+ // TODO FIXME REMOVE ME just for current compatibility
implicit def forwardable: ForwardableChannel = ForwardableChannel(channel)
/**
@@ -306,31 +304,6 @@ trait Actor {
*/
def receiveTimeout_=(timeout: Option[Long]) = context.receiveTimeout = timeout
- /**
- * Akka Scala & Java API
- * Use reply(..) to reply with a message to the original sender of the message currently
- * being processed. This method fails if the original sender of the message could not be determined with an
- * IllegalStateException.
- *
- * If you don't want deal with this IllegalStateException, but just a boolean, just use the tryReply(...)
- * version.
- *
- *
tryReply(..) to try reply with a message to the original sender of the message currently
- * being processed. This method
- *
- * Returns true if reply was sent, and false if unable to determine what to reply to.
- *
- * If you would rather have an exception, check the reply(..) version.
- */
- def tryReply(message: Any): Boolean = channel.tryTell(message)(self)
-
/**
* Same as ActorContext.children
*/
@@ -352,7 +325,7 @@ trait Actor {
* def receive = {
* case Ping =>
* println("got a 'Ping' message")
- * reply("pong")
+ * channel ! "pong"
*
* case OneWay =>
* println("got a 'OneWay' message")
@@ -403,8 +376,8 @@ trait Actor {
*/
def unhandled(message: Any) {
message match {
- case Terminated(dead, cause) ⇒ throw new DeathPactException(dead, cause)
- case _ ⇒ throw new UnhandledMessageException(message, self)
+ case Terminated(dead) ⇒ throw new DeathPactException(dead)
+ case _ ⇒ throw new UnhandledMessageException(message, self)
}
}
@@ -426,16 +399,26 @@ trait Actor {
if (h.nonEmpty) context.hotswap = h.pop
}
+ /**
+ * Registers this actor as a Monitor for the provided ActorRef
+ * @returns the provided ActorRef
+ */
+ def watch(subject: ActorRef): ActorRef = self startsMonitoring subject
+
+ /**
+ * Unregisters this actor as Monitor for the provided ActorRef
+ * @returns the provided ActorRef
+ */
+ def unwatch(subject: ActorRef): ActorRef = self stopsMonitoring subject
+
// =========================================
// ==== INTERNAL IMPLEMENTATION DETAILS ====
// =========================================
private[akka] final def apply(msg: Any) = {
- if (msg.isInstanceOf[AnyRef] && (msg.asInstanceOf[AnyRef] eq null))
- throw new InvalidMessageException("Message from [" + channel + "] to [" + self + "] is null")
def autoReceiveMessage(msg: AutoReceivedMessage) {
- if (config.DebugAutoReceive) app.eventHandler.debug(this, "received AutoReceiveMessage " + msg)
+ if (app.AkkaConfig.DebugAutoReceive) app.eventHandler.debug(this, "received AutoReceiveMessage " + msg)
msg match {
case HotSwap(code, discardOld) ⇒ become(code(self), discardOld)
diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala
index 2be9eef251..263e044e2d 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala
@@ -34,9 +34,7 @@ private[akka] trait ActorContext extends ActorRefFactory with TypedActorFactory
def currentMessage_=(invocation: Envelope): Unit
- def sender: Option[ActorRef]
-
- def senderFuture(): Option[Promise[Any]]
+ def sender: ActorRef
def channel: UntypedChannel
@@ -126,7 +124,7 @@ private[akka] class ActorCell(
def children: Iterable[ActorRef] = _children.keys
- def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = dispatcher dispatch Envelope(this, message, channel)
+ def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = dispatcher.dispatch(this, Envelope(message, channel))
def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
@@ -136,20 +134,14 @@ private[akka] class ActorCell(
case f: ActorPromise ⇒ f
case _ ⇒ new ActorPromise(timeout)(dispatcher)
}
- dispatcher dispatch Envelope(this, message, future)
+ dispatcher.dispatch(this, Envelope(message, future))
future
}
- def sender: Option[ActorRef] = currentMessage match {
- case null ⇒ None
- case msg if msg.channel.isInstanceOf[ActorRef] ⇒ Some(msg.channel.asInstanceOf[ActorRef])
- case _ ⇒ None
- }
-
- def senderFuture(): Option[Promise[Any]] = currentMessage match {
- case null ⇒ None
- case msg if msg.channel.isInstanceOf[ActorPromise] ⇒ Some(msg.channel.asInstanceOf[ActorPromise])
- case _ ⇒ None
+ def sender: ActorRef = currentMessage match {
+ case null ⇒ app.deadLetters
+ case msg if msg.channel.isInstanceOf[ActorRef] ⇒ msg.channel.asInstanceOf[ActorRef]
+ case _ ⇒ app.deadLetters
}
def channel: UntypedChannel = currentMessage match {
@@ -250,10 +242,8 @@ private[akka] class ActorCell(
}
} finally {
try {
- // when changing this, remember to update the match in the BubbleWalker
- val cause = new ActorKilledException("Stopped") //FIXME TODO make this an object, can be reused everywhere
- supervisor ! ChildTerminated(self, cause)
- app.deathWatch.publish(Terminated(self, cause))
+ supervisor ! ChildTerminated(self)
+ app.deathWatch.publish(Terminated(self))
} finally {
currentMessage = null
clearActorContext()
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index ff5bae9044..1cbfbb2508 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -9,6 +9,7 @@ import akka.util._
import scala.collection.immutable.Stack
import java.lang.{ UnsupportedOperationException, IllegalStateException }
import akka.AkkaApplication
+import akka.event.ActorEventBus
/**
* ActorRef is an immutable and serializable handle to an Actor.
@@ -68,7 +69,7 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha
* Use this method with care. In most cases it is better to use 'tell' together with the 'getContext().getSender()' to
* implement request/response message exchanges.
*
- * If you are sending messages using ask then you have to use getContext().reply(..)
+ * If you are sending messages using ask then you have to use getContext().channel().tell(...)
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def ask(message: AnyRef, timeout: Long, sender: ActorRef): Future[AnyRef] =
@@ -361,6 +362,8 @@ trait UnsupportedActorRef extends ActorRef with ScalaActorRef {
private def unsupported = throw new UnsupportedOperationException("Not supported for %s".format(getClass.getName))
}
+case class DeadLetter(message: Any, channel: UntypedChannel)
+
class DeadLetterActorRef(app: AkkaApplication) extends UnsupportedActorRef {
val brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef, promises are always broken.")))(app.dispatcher)
override val address: String = "akka:internal:DeadLetterActorRef"
@@ -375,10 +378,10 @@ class DeadLetterActorRef(app: AkkaApplication) extends UnsupportedActorRef {
override def stop(): Unit = ()
- protected[akka] override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = app.eventHandler.debug(this, message)
+ protected[akka] override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = app.eventHandler.notify(DeadLetter(message, channel))
protected[akka] override def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Timeout,
- channel: UntypedChannel): Future[Any] = { app.eventHandler.debug(this, message); brokenPromise }
+ channel: UntypedChannel): Future[Any] = { app.eventHandler.notify(DeadLetter(message, channel)); brokenPromise }
}
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
index 4b5c0ebf32..65d5cb21cd 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
@@ -100,12 +100,9 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider {
protected[akka] override def postMessageToMailbox(msg: Any, channel: UntypedChannel) {
msg match {
- case Failed(child, ex) ⇒ child.stop()
- case ChildTerminated(child, ex) ⇒ ex match {
- case a: ActorKilledException if a.getMessage == "Stopped" ⇒ terminationFuture.completeWithResult(AkkaApplication.Stopped)
- case x ⇒ terminationFuture.completeWithResult(AkkaApplication.Failed(x))
- }
- case _ ⇒ app.eventHandler.error(this, this + " received unexpected message " + msg)
+ case Failed(child, ex) ⇒ child.stop()
+ case ChildTerminated(child) ⇒ terminationFuture.completeWithResult(AkkaApplication.Stopped)
+ case _ ⇒ app.eventHandler.error(this, this + " received unexpected message " + msg)
}
}
@@ -224,7 +221,7 @@ class LocalDeathWatch extends DeathWatch with ActorClassification {
override def subscribe(subscriber: Subscriber, to: Classifier): Boolean = {
if (!super.subscribe(subscriber, to)) {
- subscriber ! Terminated(subscriber, new ActorKilledException("Already terminated when linking"))
+ subscriber ! Terminated(to)
false
} else true
}
diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala
index f1dbd61a16..3ea53364ac 100644
--- a/akka-actor/src/main/scala/akka/actor/FSM.scala
+++ b/akka-actor/src/main/scala/akka/actor/FSM.scala
@@ -502,7 +502,7 @@ trait FSM[S, D] extends ListenerManagement {
nextState.stopReason match {
case None ⇒ makeTransition(nextState)
case _ ⇒
- nextState.replies.reverse foreach reply
+ nextState.replies.reverse foreach { r ⇒ channel ! r }
terminate(nextState)
self.stop()
}
@@ -512,7 +512,7 @@ trait FSM[S, D] extends ListenerManagement {
if (!stateFunctions.contains(nextState.stateName)) {
terminate(stay withStopReason Failure("Next state %s does not exist".format(nextState.stateName)))
} else {
- nextState.replies.reverse foreach reply
+ nextState.replies.reverse foreach { r ⇒ channel ! r }
if (currentState.stateName != nextState.stateName) {
handleTransition(currentState.stateName, nextState.stateName)
notifyListeners(Transition(self, currentState.stateName, nextState.stateName))
diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala
index 873a9f1451..e3e662726e 100644
--- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala
+++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala
@@ -133,7 +133,7 @@ abstract class FaultHandlingStrategy {
/**
* Returns whether it processed the failure or not
*/
- final def handleFailure(fail: Failed, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Boolean = {
+ def handleFailure(fail: Failed, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Boolean = {
val cause = fail.cause
val action = if (decider.isDefinedAt(cause)) decider(cause) else Escalate
action match {
diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala
index b3ae7f27ef..393c442b51 100644
--- a/akka-actor/src/main/scala/akka/actor/Props.scala
+++ b/akka-actor/src/main/scala/akka/actor/Props.scala
@@ -42,7 +42,7 @@ object Props {
*/
def apply(): Props = default
- def empty = Props(context ⇒ { case null ⇒ })
+ val empty = Props(new Actor { def receive = Actor.emptyBehavior })
/**
* Returns a Props that has default values except for "creator" which will be a function that creates an instance
diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala
index 1345f436d3..bb8cf568f0 100644
--- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala
+++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala
@@ -130,7 +130,15 @@ trait TypedActorFactory { this: ActorRefFactory ⇒
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props): R =
- typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, interface.getClassLoader)
+ typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, Props.randomAddress, interface.getClassLoader)
+
+ /**
+ * Creates a new TypedActor proxy using the supplied Props,
+ * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
+ * all interfaces (Class.getInterfaces) if it's not an interface class
+ */
+ def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, address: String): R =
+ typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, address, interface.getClassLoader)
/**
* Creates a new TypedActor proxy using the supplied Props,
@@ -138,10 +146,31 @@ trait TypedActorFactory { this: ActorRefFactory ⇒
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props): R =
- typedActor.createProxyAndTypedActor(this, interface, impl.create, props, interface.getClassLoader)
+ typedActor.createProxyAndTypedActor(this, interface, impl.create, props, Props.randomAddress, interface.getClassLoader)
+ /**
+ * Creates a new TypedActor proxy using the supplied Props,
+ * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
+ * all interfaces (Class.getInterfaces) if it's not an interface class
+ */
+ def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, address: String): R =
+ typedActor.createProxyAndTypedActor(this, interface, impl.create, props, address, interface.getClassLoader)
+
+ /**
+ * Creates a new TypedActor proxy using the supplied Props,
+ * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
+ * all interfaces (Class.getInterfaces) if it's not an interface class
+ */
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, loader: ClassLoader): R =
- typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, loader)
+ typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, Props.randomAddress, loader)
+
+ /**
+ * Creates a new TypedActor proxy using the supplied Props,
+ * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
+ * all interfaces (Class.getInterfaces) if it's not an interface class
+ */
+ def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, address: String, loader: ClassLoader): R =
+ typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, address, loader)
/**
* Creates a new TypedActor proxy using the supplied Props,
@@ -149,44 +178,73 @@ trait TypedActorFactory { this: ActorRefFactory ⇒
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, loader: ClassLoader): R =
- typedActor.createProxyAndTypedActor(this, interface, impl.create, props, loader)
+ typedActor.createProxyAndTypedActor(this, interface, impl.create, props, Props.randomAddress, loader)
+
+ /**
+ * Creates a new TypedActor proxy using the supplied Props,
+ * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
+ * all interfaces (Class.getInterfaces) if it's not an interface class
+ */
+ def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, address: String, loader: ClassLoader): R =
+ typedActor.createProxyAndTypedActor(this, interface, impl.create, props, address, loader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
*/
def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, loader: ClassLoader): R =
- typedActor.createProxyAndTypedActor(this, impl, impl.newInstance, props, loader)
+ typedActor.createProxyAndTypedActor(this, impl, impl.newInstance, props, Props.randomAddress, loader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
*/
- def typedActorOf[R <: AnyRef, T <: R](props: Props = Props(), loader: ClassLoader = null)(implicit m: Manifest[T]): R = {
+ def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, address: String, loader: ClassLoader): R =
+ typedActor.createProxyAndTypedActor(this, impl, impl.newInstance, props, address, loader)
+
+ /**
+ * Creates a new TypedActor proxy using the supplied Props,
+ * the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
+ */
+ def typedActorOf[R <: AnyRef, T <: R](props: Props = Props(), address: String = Props.randomAddress, loader: ClassLoader = null)(implicit m: Manifest[T]): R = {
val clazz = m.erasure.asInstanceOf[Class[T]]
- typedActor.createProxyAndTypedActor(this, clazz, clazz.newInstance, props, if (loader eq null) clazz.getClassLoader else loader)
+ typedActor.createProxyAndTypedActor(this, clazz, clazz.newInstance, props, address, if (loader eq null) clazz.getClassLoader else loader)
}
/**
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
- def createProxy[R <: AnyRef](constructor: ⇒ Actor, props: Props = Props(), loader: ClassLoader = null)(implicit m: Manifest[R]): R =
- typedActor.createProxy[R](this, typedActor.extractInterfaces(m.erasure), (ref: AtomVar[R]) ⇒ constructor, props, if (loader eq null) m.erasure.getClassLoader else loader)
+ def createProxy[R <: AnyRef](constructor: ⇒ Actor, props: Props = Props(), address: String = Props.randomAddress, loader: ClassLoader = null)(implicit m: Manifest[R]): R =
+ typedActor.createProxy[R](this, typedActor.extractInterfaces(m.erasure), (ref: AtomVar[R]) ⇒ constructor, props, Props.randomAddress, if (loader eq null) m.erasure.getClassLoader else loader)
/**
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, loader: ClassLoader): R =
- typedActor.createProxy(this, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, loader)
+ typedActor.createProxy(this, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, Props.randomAddress, loader)
+
+ /**
+ * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
+ * to create TypedActor proxies, use typedActorOf
+ */
+ def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, address: String, loader: ClassLoader): R =
+ typedActor.createProxy(this, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, address, loader)
/**
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, props: Props, loader: ClassLoader): R =
- typedActor.createProxy[R](this, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, loader)
+ typedActor.createProxy[R](this, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, Props.randomAddress, loader)
+
+ /**
+ * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
+ * to create TypedActor proxies, use typedActorOf
+ */
+ def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, props: Props, address: String, loader: ClassLoader): R =
+ typedActor.createProxy[R](this, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, address, loader)
}
@@ -244,15 +302,15 @@ class TypedActor(val app: AkkaApplication) {
}
else null
- private[akka] def createProxy[R <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], constructor: (AtomVar[R]) ⇒ Actor, props: Props, loader: ClassLoader): R = {
+ private[akka] def createProxy[R <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], constructor: (AtomVar[R]) ⇒ Actor, props: Props, address: String, loader: ClassLoader): R = {
val proxyVar = new AtomVar[R]
- configureAndProxyLocalActorRef[R](supervisor, interfaces, proxyVar, props.withCreator(constructor(proxyVar)), loader)
+ configureAndProxyLocalActorRef[R](supervisor, interfaces, proxyVar, props.withCreator(constructor(proxyVar)), address, loader)
}
- private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](supervisor: ActorRefFactory, interface: Class[_], constructor: ⇒ T, props: Props, loader: ClassLoader): R =
- createProxy[R](supervisor, extractInterfaces(interface), (ref: AtomVar[R]) ⇒ new TypedActor[R, T](ref, constructor), props, loader)
+ private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](supervisor: ActorRefFactory, interface: Class[_], constructor: ⇒ T, props: Props, address: String, loader: ClassLoader): R =
+ createProxy[R](supervisor, extractInterfaces(interface), (ref: AtomVar[R]) ⇒ new TypedActor[R, T](ref, constructor), props, address, loader)
- private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], proxyVar: AtomVar[T], props: Props, loader: ClassLoader): T = {
+ private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], proxyVar: AtomVar[T], props: Props, address: String, loader: ClassLoader): T = {
//Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling
val actorVar = new AtomVar[ActorRef](null)
val timeout = props.timeout match {
@@ -261,7 +319,7 @@ class TypedActor(val app: AkkaApplication) {
}
val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actorVar)(timeout)).asInstanceOf[T]
proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
- val ref = supervisor.actorOf(props)
+ val ref = supervisor.actorOf(props, address)
actorVar.set(ref) //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet
proxyVar.get
}
@@ -286,7 +344,7 @@ class TypedActor(val app: AkkaApplication) {
case p: ActorPromise ⇒ p completeWith m(me).asInstanceOf[Future[Any]]
case _ ⇒ throw new IllegalStateException("Future-returning TypedActor didn't use ?/ask so cannot reply")
}
- } else reply(m(me))
+ } else channel ! m(me)
} finally {
TypedActor.selfReference set null
diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala
index 5016b0c590..ce7ce1a3f2 100644
--- a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala
+++ b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala
@@ -21,17 +21,13 @@ import akka.dispatch.{ MessageDispatcher, Promise }
*
* if (msg.equals("UseReply")) {
* // Reply to original sender of message using the 'reply' method
- * reply(msg + ":" + getSelf().getAddress());
+ * getContext().getChannel().tell(msg + ":" + getSelf().getAddress());
*
* } else if (msg.equals("UseSender") && getSender().isDefined()) {
* // Reply to original sender of message using the sender reference
* // also passing along my own reference (the self)
* getSender().get().tell(msg, getSelf());
*
- * } else if (msg.equals("UseSenderFuture") && getSenderFuture().isDefined()) {
- * // Reply to original sender of message using the sender future reference
- * getSenderFuture().get().completeWithResult(msg);
- *
* } else if (msg.equals("SendToSelf")) {
* // Send message to the actor itself recursively
* getSelf().tell(msg)
@@ -71,13 +67,7 @@ abstract class UntypedActor extends Actor {
* The reference sender Actor of the last received message.
* Is defined if the message was sent from another Actor, else None.
*/
- def getSender: Option[ActorRef] = sender
-
- /**
- * The reference sender future of the last received message.
- * Is defined if the message was sent with sent with '?'/'ask', else None.
- */
- def getSenderFuture: Option[Promise[Any]] = senderFuture
+ def getSender: ActorRef = sender
/**
* Abstraction for unification of sender and senderFuture for later reply
diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala
index bb67b618b8..f89451b962 100644
--- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala
@@ -17,12 +17,8 @@ import scala.annotation.tailrec
/**
* @author Jonas Bonér
*/
-final case class Envelope(val receiver: ActorCell, val message: Any, val channel: UntypedChannel) {
- if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null")
-
- final def invoke() {
- receiver invoke this
- }
+final case class Envelope(val message: Any, val channel: UntypedChannel) {
+ if (message.isInstanceOf[AnyRef] && (message.asInstanceOf[AnyRef] eq null)) throw new InvalidMessageException("Message is null")
}
object SystemMessage {
@@ -295,7 +291,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable
/**
* Will be called when the dispatcher is to queue an invocation for execution
*/
- protected[akka] def dispatch(invocation: Envelope)
+ protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope)
/**
* Suggest to register the provided mailbox for execution
diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala
index f38e8d8f54..93e7c99e7b 100644
--- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala
@@ -57,12 +57,7 @@ class BalancingDispatcher(
class SharingMailbox(_actor: ActorCell) extends Mailbox(_actor) with DefaultSystemMessageQueue {
final def enqueue(handle: Envelope) = messageQueue.enqueue(handle)
- final def dequeue(): Envelope = {
- val envelope = messageQueue.dequeue()
- if (envelope eq null) null
- else if (envelope.receiver eq actor) envelope
- else envelope.copy(receiver = actor)
- }
+ final def dequeue(): Envelope = messageQueue.dequeue()
final def numberOfMessages: Int = messageQueue.numberOfMessages
@@ -95,6 +90,8 @@ class BalancingDispatcher(
while (messages ne null) {
deadLetterMailbox.systemEnqueue(messages) //Send to dead letter queue
messages = messages.next
+ if (messages eq null) //Make sure that any system messages received after the current drain are also sent to the dead letter mbox
+ messages = mailBox.systemDrain()
}
}
}
@@ -106,8 +103,7 @@ class BalancingDispatcher(
} else true
}
- override protected[akka] def dispatch(invocation: Envelope) = {
- val receiver = invocation.receiver
+ override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = {
messageQueue enqueue invocation
val buddy = buddies.pollFirst()
diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
index 53d28a79a3..f9819c3cd3 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
@@ -78,8 +78,8 @@ class Dispatcher(
protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name)
protected[akka] val executorService = new AtomicReference[ExecutorService](new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService))
- protected[akka] def dispatch(invocation: Envelope) = {
- val mbox = invocation.receiver.mailbox
+ protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = {
+ val mbox = receiver.mailbox
mbox enqueue invocation
registerForExecution(mbox, true, false)
}
diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala
index 0885839b3e..7beb0ffe95 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala
@@ -30,7 +30,7 @@ private[dispatch] object Mailbox {
// secondary status: Scheduled bit may be added to Open/Suspended
final val Scheduled = 4
- // static constant for enabling println debugging of message processing (for hardcore bugs)
+ // mailbox debugging helper using println (see below)
final val debug = false
}
@@ -172,8 +172,8 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
var processedMessages = 0
val deadlineNs = if (dispatcher.isThroughputDeadlineTimeDefined) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) else 0
do {
- if (debug) println(actor + " processing message " + nextMessage.message + " from " + nextMessage.channel)
- nextMessage.invoke
+ if (debug) println(actor + " processing message " + nextMessage)
+ actor invoke nextMessage
processAllSystemMessages() //After we're done, process all system messages
@@ -185,7 +185,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
} else null //Abort
} while (nextMessage ne null)
} else { //If we only run one message per process
- nextMessage.invoke //Just run it
+ actor invoke nextMessage //Just run it
processAllSystemMessages() //After we're done, process all system messages
}
}
@@ -202,7 +202,6 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
// don’t ever execute normal message when system message present!
if (nextMessage eq null) nextMessage = systemDrain()
}
- if (debug) println(actor + " has finished processing system messages")
} catch {
case e ⇒
actor.app.eventHandler.error(e, this, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!")
@@ -244,7 +243,7 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒
@tailrec
final def systemEnqueue(message: SystemMessage): Unit = {
- if (Mailbox.debug) println(actor + " having system message enqueued: " + message)
+ if (Mailbox.debug) println(actor + " having enqueued " + message)
val head = systemQueueGet
/*
* this write is safely published by the compareAndSet contained within
diff --git a/akka-actor/src/main/scala/akka/event/EventBus.scala b/akka-actor/src/main/scala/akka/event/EventBus.scala
index 33319fbb13..9dd76f5344 100644
--- a/akka-actor/src/main/scala/akka/event/EventBus.scala
+++ b/akka-actor/src/main/scala/akka/event/EventBus.scala
@@ -182,10 +182,7 @@ trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒
if (monitored.isShutdown) false
else {
if (mappings.putIfAbsent(monitored, Vector(monitor)) ne null) associate(monitored, monitor)
- else {
- if (monitored.isShutdown) !dissociate(monitored, monitor)
- else true
- }
+ else if (monitored.isShutdown) !dissociate(monitored, monitor) else true
}
case raw: Vector[_] ⇒
val v = raw.asInstanceOf[Vector[ActorRef]]
@@ -194,10 +191,7 @@ trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒
else {
val added = v :+ monitor
if (!mappings.replace(monitored, v, added)) associate(monitored, monitor)
- else {
- if (monitored.isShutdown) !dissociate(monitored, monitor)
- else true
- }
+ else if (monitored.isShutdown) !dissociate(monitored, monitor) else true
}
}
}
@@ -241,13 +235,11 @@ trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒
case raw: Vector[_] ⇒
val v = raw.asInstanceOf[Vector[ActorRef]]
val removed = v.filterNot(monitor ==)
- if (removed eq v) false
+ if (removed eq raw) false
else if (removed.isEmpty) {
- if (!mappings.remove(monitored, v)) dissociate(monitored, monitor)
- else true
+ if (!mappings.remove(monitored, v)) dissociate(monitored, monitor) else true
} else {
- if (!mappings.replace(monitored, v, removed)) dissociate(monitored, monitor)
- else true
+ if (!mappings.replace(monitored, v, removed)) dissociate(monitored, monitor) else true
}
}
}
@@ -263,10 +255,8 @@ trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒
protected def mapSize: Int
def publish(event: Event): Unit = mappings.get(classify(event)) match {
- case null ⇒
- case raw: Vector[_] ⇒
- val v = raw.asInstanceOf[Vector[ActorRef]]
- v foreach { _ ! event }
+ case null ⇒
+ case raw: Vector[_] ⇒ raw.asInstanceOf[Vector[ActorRef]] foreach { _ ! event }
}
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = associate(to, subscriber)
diff --git a/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala b/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala
index 80230e73ff..69ae417f53 100644
--- a/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala
+++ b/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala
@@ -14,6 +14,7 @@ import java.net.InetSocketAddress
/**
* An Iterable that also contains a version.
*/
+// FIXME REMOVE VersionedIterable
trait VersionedIterable[A] {
val version: Long
diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala
index 3554cfb1af..9ea8dc78a8 100644
--- a/akka-actor/src/main/scala/akka/routing/Pool.scala
+++ b/akka-actor/src/main/scala/akka/routing/Pool.scala
@@ -103,8 +103,8 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒
protected def _route(): Actor.Receive = {
// for testing...
case Stat ⇒
- tryReply(Stats(_delegates length))
- case Terminated(victim, _) ⇒
+ channel.tryTell(Stats(_delegates length))
+ case Terminated(victim) ⇒
_delegates = _delegates filterNot { victim == }
case msg ⇒
resizeIfAppropriate()
diff --git a/akka-camel-typed/src/test/scala/akka/camel/TypedCamelTestSupport.scala b/akka-camel-typed/src/test/scala/akka/camel/TypedCamelTestSupport.scala
index 4e5d92aa22..eb4204ce0e 100644
--- a/akka-camel-typed/src/test/scala/akka/camel/TypedCamelTestSupport.scala
+++ b/akka-camel-typed/src/test/scala/akka/camel/TypedCamelTestSupport.scala
@@ -24,7 +24,7 @@ object TypedCamelTestSupport {
def countdown: Handler = {
case SetExpectedMessageCount(num) ⇒ {
latch = new CountDownLatch(num)
- reply(latch)
+ channel ! latch
}
case msg ⇒ latch.countDown
}
@@ -32,7 +32,7 @@ object TypedCamelTestSupport {
trait Respond { this: Actor ⇒
def respond: Handler = {
- case msg: Message ⇒ reply(response(msg))
+ case msg: Message ⇒ channel ! response(msg)
}
def response(msg: Message): Any = "Hello %s" format msg.body
@@ -42,8 +42,8 @@ object TypedCamelTestSupport {
val messages = Buffer[Any]()
def retain: Handler = {
- case GetRetainedMessage ⇒ reply(messages.last)
- case GetRetainedMessages(p) ⇒ reply(messages.toList.filter(p))
+ case GetRetainedMessage ⇒ channel ! messages.last
+ case GetRetainedMessages(p) ⇒ channel ! messages.filter(p).toList
case msg ⇒ {
messages += msg
msg
diff --git a/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala b/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala
index b02587f236..062c6246b5 100644
--- a/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala
+++ b/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala
@@ -127,11 +127,11 @@ private[camel] class ActivationTracker extends Actor {
def receive = {
case SetExpectedActivationCount(num) ⇒ {
activationLatch = new CountDownLatch(num)
- reply(activationLatch)
+ channel ! activationLatch
}
case SetExpectedDeactivationCount(num) ⇒ {
deactivationLatch = new CountDownLatch(num)
- reply(deactivationLatch)
+ channel ! deactivationLatch
}
case EndpointActivated ⇒ activationLatch.countDown
case EndpointDeactivated ⇒ deactivationLatch.countDown
diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala
index b29d00e7e5..bdc0079251 100644
--- a/akka-camel/src/main/scala/akka/camel/Producer.scala
+++ b/akka-camel/src/main/scala/akka/camel/Producer.scala
@@ -159,7 +159,7 @@ trait ProducerSupport { this: Actor ⇒
* actor).
*/
protected def receiveAfterProduce: Receive = {
- case msg ⇒ if (!oneway) reply(msg)
+ case msg ⇒ if (!oneway) channel ! msg
}
/**
diff --git a/akka-camel/src/test/java/akka/camel/SampleUntypedConsumer.java b/akka-camel/src/test/java/akka/camel/SampleUntypedConsumer.java
index d86d4d5cc7..93d0427902 100644
--- a/akka-camel/src/test/java/akka/camel/SampleUntypedConsumer.java
+++ b/akka-camel/src/test/java/akka/camel/SampleUntypedConsumer.java
@@ -15,7 +15,7 @@ public class SampleUntypedConsumer extends UntypedConsumerActor {
Message msg = (Message)message;
String body = msg.getBodyAs(String.class);
String header = msg.getHeaderAs("test", String.class);
- tryReply(String.format("%s %s", body, header));
+ channel.tryTell(String.format("%s %s", body, header));
}
}
diff --git a/akka-camel/src/test/java/akka/camel/SampleUntypedConsumerBlocking.java b/akka-camel/src/test/java/akka/camel/SampleUntypedConsumerBlocking.java
index 0e58677f54..db3ea7665d 100644
--- a/akka-camel/src/test/java/akka/camel/SampleUntypedConsumerBlocking.java
+++ b/akka-camel/src/test/java/akka/camel/SampleUntypedConsumerBlocking.java
@@ -17,7 +17,7 @@ public class SampleUntypedConsumerBlocking extends UntypedConsumerActor {
Message msg = (Message)message;
String body = msg.getBodyAs(String.class);
String header = msg.getHeaderAs("test", String.class);
- tryReply(String.format("%s %s", body, header));
+ channel.tryTell(String.format("%s %s", body, header));
}
}
diff --git a/akka-camel/src/test/scala/akka/camel/CamelTestSupport.scala b/akka-camel/src/test/scala/akka/camel/CamelTestSupport.scala
index be5099d050..d099dba708 100644
--- a/akka-camel/src/test/scala/akka/camel/CamelTestSupport.scala
+++ b/akka-camel/src/test/scala/akka/camel/CamelTestSupport.scala
@@ -36,7 +36,7 @@ object CamelTestSupport {
def countdown: Handler = {
case SetExpectedMessageCount(num) ⇒ {
latch = new CountDownLatch(num)
- reply(latch)
+ channel ! latch
}
case msg ⇒ latch.countDown
}
@@ -44,7 +44,7 @@ object CamelTestSupport {
trait Respond { this: Actor ⇒
def respond: Handler = {
- case msg: Message ⇒ reply(response(msg))
+ case msg: Message ⇒ channel ! response(msg)
}
def response(msg: Message): Any = "Hello %s" format msg.body
@@ -54,8 +54,8 @@ object CamelTestSupport {
val messages = Buffer[Any]()
def retain: Handler = {
- case GetRetainedMessage ⇒ reply(messages.last)
- case GetRetainedMessages(p) ⇒ reply(messages.toList.filter(p))
+ case GetRetainedMessage ⇒ channel ! messages.last
+ case GetRetainedMessages(p) ⇒ channel ! messages.filter(p).toList
case msg ⇒ {
messages += msg
msg
diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala
index 7ee64f5d65..7a83e09de7 100644
--- a/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala
@@ -211,7 +211,7 @@ object ConsumerScalaTest {
class TestConsumer(uri: String) extends Actor with Consumer {
def endpointUri = uri
protected def receive = {
- case msg: Message ⇒ reply("received %s" format msg.body)
+ case msg: Message ⇒ channel ! "received %s" format msg.body
}
}
@@ -226,7 +226,7 @@ object ConsumerScalaTest {
def endpointUri = uri
override def autoack = false
protected def receive = {
- case msg: Message ⇒ reply(Ack)
+ case msg: Message ⇒ channel ! Ack
}
}
@@ -247,15 +247,15 @@ object ConsumerScalaTest {
protected def receive = {
case "fail" ⇒ { throw new Exception("test") }
- case "succeed" ⇒ reply("ok")
+ case "succeed" ⇒ channel ! "ok"
}
override def preRestart(reason: scala.Throwable, msg: Option[Any]) {
- tryReply("pr")
+ channel.tryTell("pr")
}
override def postStop {
- tryReply("ps")
+ channel.tryTell("ps")
}
}
@@ -288,7 +288,7 @@ object ConsumerScalaTest {
}
private def respondTo(msg: Message) =
- if (valid) reply("accepted: %s" format msg.body)
+ if (valid) channel ! ("accepted: %s" format msg.body)
else throw new Exception("rejected: %s" format msg.body)
}
diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala
index d7fbc783f0..ad1ec6b5cf 100644
--- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala
@@ -253,18 +253,16 @@ object ProducerFeatureTest {
class TestResponder extends Actor {
protected def receive = {
case msg: Message ⇒ msg.body match {
- case "fail" ⇒ reply(Failure(new Exception("failure"), msg.headers))
- case _ ⇒ reply(msg.transformBody { body: String ⇒ "received %s" format body })
+ case "fail" ⇒ channel ! Failure(new Exception("failure"), msg.headers)
+ case _ ⇒ channel ! (msg.transformBody { body: String ⇒ "received %s" format body })
}
}
}
class ReplyingForwardTarget extends Actor {
protected def receive = {
- case msg: Message ⇒
- reply(msg.addHeader("test" -> "result"))
- case msg: Failure ⇒
- reply(Failure(msg.cause, msg.headers + ("test" -> "failure")))
+ case msg: Message ⇒ channel ! msg.addHeader("test" -> "result")
+ case msg: Failure ⇒ channel ! Failure(msg.cause, msg.headers + ("test" -> "failure"))
}
}
diff --git a/akka-camel/src/test/scala/akka/camel/component/ActorComponentFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/component/ActorComponentFeatureTest.scala
index d2b4067e2b..c05ceffaaa 100644
--- a/akka-camel/src/test/scala/akka/camel/component/ActorComponentFeatureTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/component/ActorComponentFeatureTest.scala
@@ -96,13 +96,13 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with
object ActorComponentFeatureTest {
class CustomIdActor extends Actor {
protected def receive = {
- case msg: Message ⇒ reply("Received %s" format msg.body)
+ case msg: Message ⇒ channel ! ("Received %s" format msg.body)
}
}
class FailWithMessage extends Actor {
protected def receive = {
- case msg: Message ⇒ reply(Failure(new Exception("test")))
+ case msg: Message ⇒ channel ! Failure(new Exception("test"))
}
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
index 0dcd0447aa..79f740b3b7 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
@@ -47,7 +47,7 @@ import akka.cluster.metrics._
import akka.cluster.zookeeper._
import ChangeListener._
import RemoteProtocol._
-import RemoteDaemonMessageType._
+import RemoteSystemDaemonMessageType._
import com.eaio.uuid.UUID
@@ -818,7 +818,7 @@ class DefaultClusterNode private[akka] (
EventHandler.debug(this,
"Sending command to nodes [%s] for checking out actor [%s]".format(nodes.mkString(", "), actorAddress))
- val builder = RemoteDaemonMessageProtocol.newBuilder
+ val builder = RemoteSystemDaemonMessageProtocol.newBuilder
.setMessageType(USE)
.setActorAddress(actorAddress)
@@ -882,7 +882,7 @@ class DefaultClusterNode private[akka] (
EventHandler.debug(this,
"Releasing (checking in) all actors with address [%s] on all nodes in cluster".format(actorAddress))
- val command = RemoteDaemonMessageProtocol.newBuilder
+ val command = RemoteSystemDaemonMessageProtocol.newBuilder
.setMessageType(RELEASE)
.setActorAddress(actorAddress)
.build
@@ -1030,7 +1030,7 @@ class DefaultClusterNode private[akka] (
Serialization.serialize(f) match {
case Left(error) ⇒ throw error
case Right(bytes) ⇒
- val message = RemoteDaemonMessageProtocol.newBuilder
+ val message = RemoteSystemDaemonMessageProtocol.newBuilder
.setMessageType(FUNCTION_FUN0_UNIT)
.setPayload(ByteString.copyFrom(bytes))
.build
@@ -1046,7 +1046,7 @@ class DefaultClusterNode private[akka] (
Serialization.serialize(f) match {
case Left(error) ⇒ throw error
case Right(bytes) ⇒
- val message = RemoteDaemonMessageProtocol.newBuilder
+ val message = RemoteSystemDaemonMessageProtocol.newBuilder
.setMessageType(FUNCTION_FUN0_ANY)
.setPayload(ByteString.copyFrom(bytes))
.build
@@ -1063,7 +1063,7 @@ class DefaultClusterNode private[akka] (
Serialization.serialize((f, arg)) match {
case Left(error) ⇒ throw error
case Right(bytes) ⇒
- val message = RemoteDaemonMessageProtocol.newBuilder
+ val message = RemoteSystemDaemonMessageProtocol.newBuilder
.setMessageType(FUNCTION_FUN1_ARG_UNIT)
.setPayload(ByteString.copyFrom(bytes))
.build
@@ -1080,7 +1080,7 @@ class DefaultClusterNode private[akka] (
Serialization.serialize((f, arg)) match {
case Left(error) ⇒ throw error
case Right(bytes) ⇒
- val message = RemoteDaemonMessageProtocol.newBuilder
+ val message = RemoteSystemDaemonMessageProtocol.newBuilder
.setMessageType(FUNCTION_FUN1_ARG_ANY)
.setPayload(ByteString.copyFrom(bytes))
.build
@@ -1151,7 +1151,7 @@ class DefaultClusterNode private[akka] (
// Private
// =======================================
- private def sendCommandToNode(connection: ActorRef, command: RemoteDaemonMessageProtocol, async: Boolean = true) {
+ private def sendCommandToNode(connection: ActorRef, command: RemoteSystemDaemonMessageProtocol, async: Boolean = true) {
if (async) {
connection ! command
} else {
@@ -1442,7 +1442,7 @@ class DefaultClusterNode private[akka] (
case Left(error) ⇒ throw error
case Right(bytes) ⇒
- val command = RemoteDaemonMessageProtocol.newBuilder
+ val command = RemoteSystemDaemonMessageProtocol.newBuilder
.setMessageType(FAIL_OVER_CONNECTIONS)
.setPayload(ByteString.copyFrom(bytes))
.build
@@ -1713,7 +1713,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
}
def receive: Receive = {
- case message: RemoteDaemonMessageProtocol ⇒
+ case message: RemoteSystemDaemonMessageProtocol ⇒
EventHandler.debug(this,
"Received command [\n%s] to RemoteClusterDaemon on node [%s]".format(message, cluster.nodeAddress.nodeName))
@@ -1735,7 +1735,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
case unknown ⇒ EventHandler.warning(this, "Unknown message [%s]".format(unknown))
}
- def handleRelease(message: RemoteProtocol.RemoteDaemonMessageProtocol) {
+ def handleRelease(message: RemoteProtocol.RemoteSystemDaemonMessageProtocol) {
if (message.hasActorUuid) {
cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach { address ⇒
cluster.release(address)
@@ -1748,7 +1748,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
}
}
- def handleUse(message: RemoteProtocol.RemoteDaemonMessageProtocol) {
+ def handleUse(message: RemoteProtocol.RemoteSystemDaemonMessageProtocol) {
def deserializeMessages(entriesAsBytes: Vector[Array[Byte]]): Vector[AnyRef] = {
import akka.cluster.RemoteProtocol._
import akka.cluster.MessageSerializer
@@ -1855,7 +1855,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
}
}
- def handle_fun0_unit(message: RemoteProtocol.RemoteDaemonMessageProtocol) {
+ def handle_fun0_unit(message: RemoteProtocol.RemoteSystemDaemonMessageProtocol) {
new LocalActorRef(
Props(
self ⇒ {
@@ -1863,7 +1863,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
}).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) ! payloadFor(message, classOf[Function0[Unit]])
}
- def handle_fun0_any(message: RemoteProtocol.RemoteDaemonMessageProtocol) {
+ def handle_fun0_any(message: RemoteProtocol.RemoteSystemDaemonMessageProtocol) {
new LocalActorRef(
Props(
self ⇒ {
@@ -1871,7 +1871,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
}).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) forward payloadFor(message, classOf[Function0[Any]])
}
- def handle_fun1_arg_unit(message: RemoteProtocol.RemoteDaemonMessageProtocol) {
+ def handle_fun1_arg_unit(message: RemoteProtocol.RemoteSystemDaemonMessageProtocol) {
new LocalActorRef(
Props(
self ⇒ {
@@ -1879,7 +1879,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
}).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
}
- def handle_fun1_arg_any(message: RemoteProtocol.RemoteDaemonMessageProtocol) {
+ def handle_fun1_arg_any(message: RemoteProtocol.RemoteSystemDaemonMessageProtocol) {
new LocalActorRef(
Props(
self ⇒ {
@@ -1887,12 +1887,12 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
}).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]])
}
- def handleFailover(message: RemoteProtocol.RemoteDaemonMessageProtocol) {
+ def handleFailover(message: RemoteProtocol.RemoteSystemDaemonMessageProtocol) {
val (from, to) = payloadFor(message, classOf[(InetSocketAddress, InetSocketAddress)])
cluster.failOverClusterActorRefConnections(from, to)
}
- private def payloadFor[T](message: RemoteDaemonMessageProtocol, clazz: Class[T]): T = {
+ private def payloadFor[T](message: RemoteSystemDaemonMessageProtocol, clazz: Class[T]): T = {
Serialization.deserialize(message.getPayload.toByteArray, clazz, None) match {
case Left(error) ⇒ throw error
case Right(instance) ⇒ instance.asInstanceOf[T]
diff --git a/akka-docs/intro/code/tutorials/first/Pi.scala b/akka-docs/intro/code/tutorials/first/Pi.scala
index 57e5b3558e..c346db3820 100644
--- a/akka-docs/intro/code/tutorials/first/Pi.scala
+++ b/akka-docs/intro/code/tutorials/first/Pi.scala
@@ -50,8 +50,7 @@ object Pi extends App {
//#calculatePiFor
def receive = {
- case Work(start, nrOfElements) ⇒
- reply(Result(calculatePiFor(start, nrOfElements))) // perform the work
+ case Work(start, nrOfElements) ⇒ channel ! Result(calculatePiFor(start, nrOfElements)) // perform the work
}
}
//#worker
diff --git a/akka-docs/scala/futures.rst b/akka-docs/scala/futures.rst
index 77dff1856c..622c26f778 100644
--- a/akka-docs/scala/futures.rst
+++ b/akka-docs/scala/futures.rst
@@ -179,7 +179,7 @@ This is fine when dealing with a known amount of Actors, but can grow unwieldy i
To better explain what happened in the example, ``Future.sequence`` is taking the ``List[Future[Int]]`` and turning it into a ``Future[List[Int]]``. We can then use ``map`` to work with the ``List[Int]`` directly, and we find the sum of the ``List``.
-The ``traverse`` method is similar to ``sequence``, but it takes a ``T[A]`` and a function ``T => Future[B]`` to return a ``Future[T[B]]``, where ``T`` is again a subclass of Traversable. For example, to use ``traverse`` to sum the first 100 odd numbers:
+The ``traverse`` method is similar to ``sequence``, but it takes a ``T[A]`` and a function ``A => Future[B]`` to return a ``Future[T[B]]``, where ``T`` is again a subclass of Traversable. For example, to use ``traverse`` to sum the first 100 odd numbers:
.. code-block:: scala
diff --git a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java
index 7c60c16e40..a7e964bd42 100644
--- a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java
+++ b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java
@@ -365,7 +365,7 @@ public final class RemoteProtocol {
// @@protoc_insertion_point(enum_scope:LifeCycleType)
}
- public enum RemoteDaemonMessageType
+ public enum RemoteSystemDaemonMessageType
implements com.google.protobuf.ProtocolMessageEnum {
STOP(0, 1),
USE(1, 2),
@@ -375,11 +375,13 @@ public final class RemoteProtocol {
DISCONNECT(5, 6),
RECONNECT(6, 7),
RESIGN(7, 8),
- FAIL_OVER_CONNECTIONS(8, 9),
- FUNCTION_FUN0_UNIT(9, 10),
- FUNCTION_FUN0_ANY(10, 11),
- FUNCTION_FUN1_ARG_UNIT(11, 12),
- FUNCTION_FUN1_ARG_ANY(12, 13),
+ GOSSIP(8, 9),
+ GOSSIP_ACK(9, 10),
+ FAIL_OVER_CONNECTIONS(10, 20),
+ FUNCTION_FUN0_UNIT(11, 21),
+ FUNCTION_FUN0_ANY(12, 22),
+ FUNCTION_FUN1_ARG_UNIT(13, 23),
+ FUNCTION_FUN1_ARG_ANY(14, 24),
;
public static final int STOP_VALUE = 1;
@@ -390,16 +392,18 @@ public final class RemoteProtocol {
public static final int DISCONNECT_VALUE = 6;
public static final int RECONNECT_VALUE = 7;
public static final int RESIGN_VALUE = 8;
- public static final int FAIL_OVER_CONNECTIONS_VALUE = 9;
- public static final int FUNCTION_FUN0_UNIT_VALUE = 10;
- public static final int FUNCTION_FUN0_ANY_VALUE = 11;
- public static final int FUNCTION_FUN1_ARG_UNIT_VALUE = 12;
- public static final int FUNCTION_FUN1_ARG_ANY_VALUE = 13;
+ public static final int GOSSIP_VALUE = 9;
+ public static final int GOSSIP_ACK_VALUE = 10;
+ public static final int FAIL_OVER_CONNECTIONS_VALUE = 20;
+ public static final int FUNCTION_FUN0_UNIT_VALUE = 21;
+ public static final int FUNCTION_FUN0_ANY_VALUE = 22;
+ public static final int FUNCTION_FUN1_ARG_UNIT_VALUE = 23;
+ public static final int FUNCTION_FUN1_ARG_ANY_VALUE = 24;
public final int getNumber() { return value; }
- public static RemoteDaemonMessageType valueOf(int value) {
+ public static RemoteSystemDaemonMessageType valueOf(int value) {
switch (value) {
case 1: return STOP;
case 2: return USE;
@@ -409,24 +413,26 @@ public final class RemoteProtocol {
case 6: return DISCONNECT;
case 7: return RECONNECT;
case 8: return RESIGN;
- case 9: return FAIL_OVER_CONNECTIONS;
- case 10: return FUNCTION_FUN0_UNIT;
- case 11: return FUNCTION_FUN0_ANY;
- case 12: return FUNCTION_FUN1_ARG_UNIT;
- case 13: return FUNCTION_FUN1_ARG_ANY;
+ case 9: return GOSSIP;
+ case 10: return GOSSIP_ACK;
+ case 20: return FAIL_OVER_CONNECTIONS;
+ case 21: return FUNCTION_FUN0_UNIT;
+ case 22: return FUNCTION_FUN0_ANY;
+ case 23: return FUNCTION_FUN1_ARG_UNIT;
+ case 24: return FUNCTION_FUN1_ARG_ANY;
default: return null;
}
}
- public static com.google.protobuf.Internal.EnumLiteMap