=act #3572 Add parens to sender
* because it is not referentially transparent; normally we reserved parens for side-effecting code but given how people thoughtlessly close over it we revised that that decision for sender * caller can still omit parens
This commit is contained in:
parent
537840bd2a
commit
a11fb1dafc
202 changed files with 631 additions and 620 deletions
|
|
@ -26,7 +26,7 @@ class ActorDSLSpec extends AkkaSpec {
|
|||
|
||||
val echo = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case x ⇒ sender ! x
|
||||
case x ⇒ sender() ! x
|
||||
}
|
||||
}))
|
||||
|
||||
|
|
@ -119,7 +119,7 @@ class ActorDSLSpec extends AkkaSpec {
|
|||
//#simple-actor
|
||||
val a = actor(new Act {
|
||||
become {
|
||||
case "hello" ⇒ sender ! "hi"
|
||||
case "hello" ⇒ sender() ! "hi"
|
||||
}
|
||||
})
|
||||
//#simple-actor
|
||||
|
|
@ -133,10 +133,10 @@ class ActorDSLSpec extends AkkaSpec {
|
|||
//#becomeStacked
|
||||
val a = actor(new Act {
|
||||
become { // this will replace the initial (empty) behavior
|
||||
case "info" ⇒ sender ! "A"
|
||||
case "info" ⇒ sender() ! "A"
|
||||
case "switch" ⇒
|
||||
becomeStacked { // this will stack upon the "A" behavior
|
||||
case "info" ⇒ sender ! "B"
|
||||
case "info" ⇒ sender() ! "B"
|
||||
case "switch" ⇒ unbecome() // return to the "A" behavior
|
||||
}
|
||||
case "lobotomize" ⇒ unbecome() // OH NOES: Actor.emptyBehavior
|
||||
|
|
@ -144,7 +144,7 @@ class ActorDSLSpec extends AkkaSpec {
|
|||
})
|
||||
//#becomeStacked
|
||||
|
||||
implicit def sender = testActor
|
||||
implicit val sender = testActor
|
||||
a ! "info"
|
||||
expectMsg("A")
|
||||
a ! "switch"
|
||||
|
|
|
|||
|
|
@ -15,9 +15,9 @@ object ActorFireForgetRequestReplySpec {
|
|||
class ReplyActor extends Actor {
|
||||
def receive = {
|
||||
case "Send" ⇒
|
||||
sender ! "Reply"
|
||||
sender() ! "Reply"
|
||||
case "SendImplicit" ⇒
|
||||
sender ! "ReplyImplicit"
|
||||
sender() ! "ReplyImplicit"
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ object ActorLifeCycleSpec {
|
|||
val currentGen = generationProvider.getAndIncrement()
|
||||
override def preStart() { report("preStart") }
|
||||
override def postStop() { report("postStop") }
|
||||
def receive = { case "status" ⇒ sender ! message("OK") }
|
||||
def receive = { case "status" ⇒ sender() ! message("OK") }
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -131,15 +131,15 @@ class ActorLifeCycleSpec extends AkkaSpec("akka.actor.serialize-messages=off") w
|
|||
case class Become(recv: ActorContext ⇒ Receive)
|
||||
val a = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case Become(beh) ⇒ { context.become(beh(context), discardOld = false); sender ! "ok" }
|
||||
case x ⇒ sender ! 42
|
||||
case Become(beh) ⇒ { context.become(beh(context), discardOld = false); sender() ! "ok" }
|
||||
case x ⇒ sender() ! 42
|
||||
}
|
||||
}))
|
||||
a ! "hello"
|
||||
expectMsg(42)
|
||||
a ! Become(ctx ⇒ {
|
||||
case "fail" ⇒ throw new RuntimeException("buh")
|
||||
case x ⇒ ctx.sender ! 43
|
||||
case x ⇒ ctx.sender() ! 43
|
||||
})
|
||||
expectMsg("ok")
|
||||
a ! "hello"
|
||||
|
|
|
|||
|
|
@ -24,11 +24,11 @@ object ActorLookupSpec {
|
|||
|
||||
class Node extends Actor {
|
||||
def receive = {
|
||||
case Create(name) ⇒ sender ! context.actorOf(p, name)
|
||||
case LookupElems(path) ⇒ sender ! context.actorFor(path)
|
||||
case LookupString(path) ⇒ sender ! context.actorFor(path)
|
||||
case LookupPath(path) ⇒ sender ! context.actorFor(path)
|
||||
case GetSender(ref) ⇒ ref ! sender
|
||||
case Create(name) ⇒ sender() ! context.actorOf(p, name)
|
||||
case LookupElems(path) ⇒ sender() ! context.actorFor(path)
|
||||
case LookupString(path) ⇒ sender() ! context.actorFor(path)
|
||||
case LookupPath(path) ⇒ sender() ! context.actorFor(path)
|
||||
case GetSender(ref) ⇒ ref ! sender()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -149,7 +149,7 @@ object ActorMailboxSpec {
|
|||
|
||||
class QueueReportingActor extends Actor {
|
||||
def receive = {
|
||||
case _ ⇒ sender ! context.asInstanceOf[ActorCell].mailbox.messageQueue
|
||||
case _ ⇒ sender() ! context.asInstanceOf[ActorCell].mailbox.messageQueue
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -22,13 +22,13 @@ object ActorPerfSpec {
|
|||
|
||||
class EmptyActor extends Actor {
|
||||
def receive = {
|
||||
case IsAlive ⇒ sender ! Alive
|
||||
case IsAlive ⇒ sender() ! Alive
|
||||
}
|
||||
}
|
||||
|
||||
class EmptyArgsActor(val foo: Int, val bar: Int) extends Actor {
|
||||
def receive = {
|
||||
case IsAlive ⇒ sender ! Alive
|
||||
case IsAlive ⇒ sender() ! Alive
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -36,15 +36,15 @@ object ActorPerfSpec {
|
|||
|
||||
def receive = {
|
||||
case IsAlive ⇒
|
||||
sender ! Alive
|
||||
sender() ! Alive
|
||||
case Create(number, propsCreator) ⇒
|
||||
for (i ← 1 to number) {
|
||||
context.actorOf(propsCreator.apply())
|
||||
}
|
||||
sender ! Created
|
||||
sender() ! Created
|
||||
case WaitForChildren(number) ⇒
|
||||
context.children.foreach(_ ! IsAlive)
|
||||
context.become(waiting(number, sender), false)
|
||||
context.become(waiting(number, sender()), false)
|
||||
}
|
||||
|
||||
def waiting(number: Int, replyTo: ActorRef): Receive = {
|
||||
|
|
|
|||
|
|
@ -25,15 +25,15 @@ object ActorRefSpec {
|
|||
|
||||
def receive = {
|
||||
case "complexRequest" ⇒ {
|
||||
replyTo = sender
|
||||
replyTo = sender()
|
||||
val worker = context.actorOf(Props[WorkerActor])
|
||||
worker ! "work"
|
||||
}
|
||||
case "complexRequest2" ⇒
|
||||
val worker = context.actorOf(Props[WorkerActor])
|
||||
worker ! ReplyTo(sender)
|
||||
worker ! ReplyTo(sender())
|
||||
case "workDone" ⇒ replyTo ! "complexReply"
|
||||
case "simpleRequest" ⇒ sender ! "simpleReply"
|
||||
case "simpleRequest" ⇒ sender() ! "simpleReply"
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -42,7 +42,7 @@ object ActorRefSpec {
|
|||
def receive = {
|
||||
case "work" ⇒ {
|
||||
work()
|
||||
sender ! "workDone"
|
||||
sender() ! "workDone"
|
||||
context.stop(self)
|
||||
}
|
||||
case ReplyTo(replyTo) ⇒ {
|
||||
|
|
@ -71,7 +71,7 @@ object ActorRefSpec {
|
|||
|
||||
class OuterActor(val inner: ActorRef) extends Actor {
|
||||
def receive = {
|
||||
case "self" ⇒ sender ! self
|
||||
case "self" ⇒ sender() ! self
|
||||
case x ⇒ inner forward x
|
||||
}
|
||||
}
|
||||
|
|
@ -80,7 +80,7 @@ object ActorRefSpec {
|
|||
val fail = new InnerActor
|
||||
|
||||
def receive = {
|
||||
case "self" ⇒ sender ! self
|
||||
case "self" ⇒ sender() ! self
|
||||
case x ⇒ inner forward x
|
||||
}
|
||||
}
|
||||
|
|
@ -91,8 +91,8 @@ object ActorRefSpec {
|
|||
|
||||
class InnerActor extends Actor {
|
||||
def receive = {
|
||||
case "innerself" ⇒ sender ! self
|
||||
case other ⇒ sender ! other
|
||||
case "innerself" ⇒ sender() ! self
|
||||
case other ⇒ sender() ! other
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -100,8 +100,8 @@ object ActorRefSpec {
|
|||
val fail = new InnerActor
|
||||
|
||||
def receive = {
|
||||
case "innerself" ⇒ sender ! self
|
||||
case other ⇒ sender ! other
|
||||
case "innerself" ⇒ sender() ! self
|
||||
case other ⇒ sender() ! other
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -335,7 +335,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
"support nested actorOfs" in {
|
||||
val a = system.actorOf(Props(new Actor {
|
||||
val nested = system.actorOf(Props(new Actor { def receive = { case _ ⇒ } }))
|
||||
def receive = { case _ ⇒ sender ! nested }
|
||||
def receive = { case _ ⇒ sender() ! nested }
|
||||
}))
|
||||
|
||||
val nested = Await.result((a ? "any").mapTo[ActorRef], timeout.duration)
|
||||
|
|
@ -391,8 +391,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
val timeout = Timeout(20000)
|
||||
val ref = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case 5 ⇒ sender ! "five"
|
||||
case 0 ⇒ sender ! "null"
|
||||
case 5 ⇒ sender() ! "five"
|
||||
case 0 ⇒ sender() ! "null"
|
||||
}
|
||||
}))
|
||||
|
||||
|
|
@ -438,7 +438,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
def receive = { case _ ⇒ }
|
||||
}), "child")
|
||||
|
||||
def receive = { case name: String ⇒ sender ! context.child(name).isDefined }
|
||||
def receive = { case name: String ⇒ sender() ! context.child(name).isDefined }
|
||||
}), "parent")
|
||||
|
||||
assert(Await.result((parent ? "child"), remaining) === true)
|
||||
|
|
|
|||
|
|
@ -24,12 +24,12 @@ object ActorSelectionSpec {
|
|||
|
||||
class Node extends Actor {
|
||||
def receive = {
|
||||
case Create(name) ⇒ sender ! context.actorOf(p, name)
|
||||
case SelectString(path) ⇒ sender ! context.actorSelection(path)
|
||||
case SelectPath(path) ⇒ sender ! context.actorSelection(path)
|
||||
case GetSender(ref) ⇒ ref ! sender
|
||||
case Create(name) ⇒ sender() ! context.actorOf(p, name)
|
||||
case SelectString(path) ⇒ sender() ! context.actorSelection(path)
|
||||
case SelectPath(path) ⇒ sender() ! context.actorSelection(path)
|
||||
case GetSender(ref) ⇒ ref ! sender()
|
||||
case Forward(path, msg) ⇒ context.actorSelection(path).forward(msg)
|
||||
case msg ⇒ sender ! msg
|
||||
case msg ⇒ sender() ! msg
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ object ActorSystemSpec {
|
|||
|
||||
def receive = {
|
||||
case n: Int ⇒
|
||||
master = sender
|
||||
master = sender()
|
||||
terminaters = Set() ++ (for (i ← 1 to n) yield {
|
||||
val man = context.watch(context.system.actorOf(Props[Terminater]))
|
||||
man ! "run"
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ object ActorWithBoundedStashSpec {
|
|||
def receive = {
|
||||
case msg: String if msg.startsWith("hello") ⇒
|
||||
stash()
|
||||
sender ! "ok"
|
||||
sender() ! "ok"
|
||||
|
||||
case "world" ⇒
|
||||
context.become(afterWorldBehaviour)
|
||||
|
|
@ -42,13 +42,13 @@ object ActorWithBoundedStashSpec {
|
|||
def receive = {
|
||||
case msg: String if msg.startsWith("hello") ⇒
|
||||
numStashed += 1
|
||||
try { stash(); sender ! "ok" } catch {
|
||||
try { stash(); sender() ! "ok" } catch {
|
||||
case _: StashOverflowException ⇒
|
||||
if (numStashed == 21) {
|
||||
sender ! "STASHOVERFLOW"
|
||||
sender() ! "STASHOVERFLOW"
|
||||
context stop self
|
||||
} else {
|
||||
sender ! "Unexpected StashOverflowException: " + numStashed
|
||||
sender() ! "Unexpected StashOverflowException: " + numStashed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ object Chameneos {
|
|||
|
||||
case Exit ⇒
|
||||
colour = FADED
|
||||
sender ! MeetingCount(meetings)
|
||||
sender() ! MeetingCount(meetings)
|
||||
}
|
||||
|
||||
def complement(otherColour: Colour): Colour = colour match {
|
||||
|
|
@ -95,11 +95,11 @@ object Chameneos {
|
|||
n -= 1
|
||||
chameneo ! msg
|
||||
waitingChameneo = None
|
||||
case None ⇒ waitingChameneo = Some(sender)
|
||||
case None ⇒ waitingChameneo = Some(sender())
|
||||
}
|
||||
} else {
|
||||
waitingChameneo.foreach(_ ! Exit)
|
||||
sender ! Exit
|
||||
sender() ! Exit
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,18 +32,18 @@ object ConsistencySpec {
|
|||
case step: Long ⇒
|
||||
|
||||
if (lastStep != (step - 1))
|
||||
sender ! "Test failed: Last step %s, this step %s".format(lastStep, step)
|
||||
sender() ! "Test failed: Last step %s, this step %s".format(lastStep, step)
|
||||
|
||||
var shouldBeFortyTwo = left.value + right.value
|
||||
if (shouldBeFortyTwo != 42)
|
||||
sender ! "Test failed: 42 failed"
|
||||
sender() ! "Test failed: 42 failed"
|
||||
else {
|
||||
left.value += 1
|
||||
right.value -= 1
|
||||
}
|
||||
|
||||
lastStep = step
|
||||
case "done" ⇒ sender ! "done"; context.stop(self)
|
||||
case "done" ⇒ sender() ! "done"; context.stop(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -90,7 +90,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
context.watch(terminal)
|
||||
context.unwatch(terminal)
|
||||
def receive = {
|
||||
case "ping" ⇒ sender ! "pong"
|
||||
case "ping" ⇒ sender() ! "pong"
|
||||
case t: Terminated ⇒ testActor ! WrappedTerminated(t)
|
||||
}
|
||||
}).withDeploy(Deploy.local))
|
||||
|
|
@ -113,7 +113,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
filterException[ActorKilledException] {
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 2)(List(classOf[Exception])))))
|
||||
val terminalProps = Props(new Actor { def receive = { case x ⇒ sender ! x } })
|
||||
val terminalProps = Props(new Actor { def receive = { case x ⇒ sender() ! x } })
|
||||
val terminal = Await.result((supervisor ? terminalProps).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
val monitor = startWatching(terminal)
|
||||
|
|
|
|||
|
|
@ -216,7 +216,7 @@ object FSMTimingSpec {
|
|||
setTimer("hallo", Tock, 1.milli.dilated)
|
||||
TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1.second.dilated)
|
||||
cancelTimer("hallo")
|
||||
sender ! Tick
|
||||
sender() ! Tick
|
||||
setTimer("hallo", Tock, 500.millis.dilated)
|
||||
stay
|
||||
case Event(Tock, _) ⇒
|
||||
|
|
|
|||
|
|
@ -97,7 +97,7 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender {
|
|||
val fsmref = system.actorOf(Props(new Actor with FSM[Int, ActorRef] {
|
||||
startWith(0, null)
|
||||
when(0) {
|
||||
case Event("switch", _) ⇒ goto(1) using sender
|
||||
case Event("switch", _) ⇒ goto(1) using sender()
|
||||
}
|
||||
onTransition {
|
||||
case x -> y ⇒ nextStateData ! (x -> y)
|
||||
|
|
@ -105,9 +105,9 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender {
|
|||
when(1) {
|
||||
case Event("test", _) ⇒
|
||||
try {
|
||||
sender ! s"failed: ${nextStateData}"
|
||||
sender() ! s"failed: ${nextStateData}"
|
||||
} catch {
|
||||
case _: IllegalStateException ⇒ sender ! "ok"
|
||||
case _: IllegalStateException ⇒ sender() ! "ok"
|
||||
}
|
||||
stay
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ object ForwardActorSpec {
|
|||
|
||||
def createForwardingChain(system: ActorSystem): ActorRef = {
|
||||
val replier = system.actorOf(Props(new Actor {
|
||||
def receive = { case x ⇒ sender ! x }
|
||||
def receive = { case x ⇒ sender() ! x }
|
||||
}))
|
||||
|
||||
def mkforwarder(forwardTo: ActorRef) = system.actorOf(Props(
|
||||
|
|
|
|||
|
|
@ -19,8 +19,8 @@ class HotSwapSpec extends AkkaSpec with ImplicitSender {
|
|||
"An Actor" must {
|
||||
"be able to become in its constructor" in {
|
||||
val a = system.actorOf(Props(new Becomer {
|
||||
context.become { case always ⇒ sender ! always }
|
||||
def receive = { case always ⇒ sender ! "FAILURE" }
|
||||
context.become { case always ⇒ sender() ! always }
|
||||
def receive = { case always ⇒ sender() ! "FAILURE" }
|
||||
}))
|
||||
a ! "pigdog"
|
||||
expectMsg("pigdog")
|
||||
|
|
@ -28,8 +28,8 @@ class HotSwapSpec extends AkkaSpec with ImplicitSender {
|
|||
|
||||
"be able to become multiple times in its constructor" in {
|
||||
val a = system.actorOf(Props(new Becomer {
|
||||
for (i ← 1 to 4) context.become({ case always ⇒ sender ! i + ":" + always })
|
||||
def receive = { case always ⇒ sender ! "FAILURE" }
|
||||
for (i ← 1 to 4) context.become({ case always ⇒ sender() ! i + ":" + always })
|
||||
def receive = { case always ⇒ sender() ! "FAILURE" }
|
||||
}))
|
||||
a ! "pigdog"
|
||||
expectMsg("4:pigdog")
|
||||
|
|
@ -37,8 +37,8 @@ class HotSwapSpec extends AkkaSpec with ImplicitSender {
|
|||
|
||||
"be able to become with stacking in its constructor" in {
|
||||
val a = system.actorOf(Props(new Becomer {
|
||||
context.become({ case always ⇒ sender ! "pigdog:" + always; context.unbecome() }, false)
|
||||
def receive = { case always ⇒ sender ! "badass:" + always }
|
||||
context.become({ case always ⇒ sender() ! "pigdog:" + always; context.unbecome() }, false)
|
||||
def receive = { case always ⇒ sender() ! "badass:" + always }
|
||||
}))
|
||||
a ! "pigdog"
|
||||
expectMsg("pigdog:pigdog")
|
||||
|
|
@ -48,8 +48,8 @@ class HotSwapSpec extends AkkaSpec with ImplicitSender {
|
|||
|
||||
"be able to become, with stacking, multiple times in its constructor" in {
|
||||
val a = system.actorOf(Props(new Becomer {
|
||||
for (i ← 1 to 4) context.become({ case always ⇒ sender ! i + ":" + always; context.unbecome() }, false)
|
||||
def receive = { case always ⇒ sender ! "FAILURE" }
|
||||
for (i ← 1 to 4) context.become({ case always ⇒ sender() ! i + ":" + always; context.unbecome() }, false)
|
||||
def receive = { case always ⇒ sender() ! "FAILURE" }
|
||||
}))
|
||||
a ! "pigdog"
|
||||
a ! "pigdog"
|
||||
|
|
@ -64,8 +64,8 @@ class HotSwapSpec extends AkkaSpec with ImplicitSender {
|
|||
"be able to hotswap its behavior with become(..)" in {
|
||||
val a = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case "init" ⇒ sender ! "init"
|
||||
case "swap" ⇒ context.become({ case x: String ⇒ context.sender ! x })
|
||||
case "init" ⇒ sender() ! "init"
|
||||
case "swap" ⇒ context.become({ case x: String ⇒ context.sender() ! x })
|
||||
}
|
||||
}))
|
||||
|
||||
|
|
@ -79,9 +79,9 @@ class HotSwapSpec extends AkkaSpec with ImplicitSender {
|
|||
"be able to revert hotswap its behavior with unbecome" in {
|
||||
val a = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case "init" ⇒ sender ! "init"
|
||||
case "init" ⇒ sender() ! "init"
|
||||
case "swap" ⇒ context.become({
|
||||
case "swapped" ⇒ sender ! "swapped"
|
||||
case "swapped" ⇒ sender() ! "swapped"
|
||||
case "revert" ⇒ context.unbecome()
|
||||
})
|
||||
}
|
||||
|
|
@ -103,14 +103,14 @@ class HotSwapSpec extends AkkaSpec with ImplicitSender {
|
|||
|
||||
val a = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case "state" ⇒ sender ! "0"
|
||||
case "state" ⇒ sender() ! "0"
|
||||
case "swap" ⇒
|
||||
context.become({
|
||||
case "state" ⇒ sender ! "1"
|
||||
case "swapped" ⇒ sender ! "swapped"
|
||||
case "state" ⇒ sender() ! "1"
|
||||
case "swapped" ⇒ sender() ! "swapped"
|
||||
case "crash" ⇒ throw new Exception("Crash (expected)!")
|
||||
})
|
||||
sender ! "swapped"
|
||||
sender() ! "swapped"
|
||||
}
|
||||
}))
|
||||
a ! "state"
|
||||
|
|
|
|||
|
|
@ -45,8 +45,8 @@ class LocalActorRefProviderSpec extends AkkaSpec(LocalActorRefProviderSpec.confi
|
|||
val child = context.actorOf(Props.empty, name = childName)
|
||||
def receive = {
|
||||
case "lookup" ⇒
|
||||
if (childName == child.path.name) sender ! context.actorFor(childName)
|
||||
else sender ! s"$childName is not ${child.path.name}!"
|
||||
if (childName == child.path.name) sender() ! context.actorFor(childName)
|
||||
else sender() ! s"$childName is not ${child.path.name}!"
|
||||
}
|
||||
}))
|
||||
a.tell("lookup", testActor)
|
||||
|
|
@ -62,7 +62,7 @@ class LocalActorRefProviderSpec extends AkkaSpec(LocalActorRefProviderSpec.confi
|
|||
val GetChild = "GetChild"
|
||||
val a = watch(system.actorOf(Props(new Actor {
|
||||
val child = context.actorOf(Props.empty)
|
||||
def receive = { case `GetChild` ⇒ sender ! child }
|
||||
def receive = { case `GetChild` ⇒ sender() ! child }
|
||||
})))
|
||||
a.tell(GetChild, testActor)
|
||||
val child = expectMsgType[ActorRef]
|
||||
|
|
|
|||
|
|
@ -215,7 +215,7 @@ class RestartStrategySpec extends AkkaSpec("akka.actor.serialize-messages = off"
|
|||
val boss = system.actorOf(Props(new Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy(withinTimeRange = 1 second)(List(classOf[Throwable]))
|
||||
def receive = {
|
||||
case p: Props ⇒ sender ! context.watch(context.actorOf(p))
|
||||
case p: Props ⇒ sender() ! context.watch(context.actorOf(p))
|
||||
case t: Terminated ⇒ maxNoOfRestartsLatch.open()
|
||||
}
|
||||
}))
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit
|
|||
def receive = {
|
||||
case Tick ⇒
|
||||
if (ticks < 3) {
|
||||
sender ! Tock
|
||||
sender() ! Tock
|
||||
ticks += 1
|
||||
}
|
||||
}
|
||||
|
|
@ -66,7 +66,7 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit
|
|||
}
|
||||
|
||||
"stop continuous scheduling if the receiving actor has been terminated" taggedAs TimingTest in {
|
||||
val actor = system.actorOf(Props(new Actor { def receive = { case x ⇒ sender ! x } }))
|
||||
val actor = system.actorOf(Props(new Actor { def receive = { case x ⇒ sender() ! x } }))
|
||||
|
||||
// run immediately and then every 100 milliseconds
|
||||
collectCancellable(system.scheduler.schedule(0 milliseconds, 100 milliseconds, actor, "msg"))
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ package akka.actor
|
|||
class Supervisor(override val supervisorStrategy: SupervisorStrategy) extends Actor {
|
||||
|
||||
def receive = {
|
||||
case x: Props ⇒ sender ! context.actorOf(x)
|
||||
case x: Props ⇒ sender() ! context.actorOf(x)
|
||||
}
|
||||
// need to override the default of stopping all children upon restart, tests rely on keeping them around
|
||||
override def preRestart(cause: Throwable, msg: Option[Any]) {}
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ object SupervisorHierarchySpec {
|
|||
class CountDownActor(countDown: CountDownLatch, override val supervisorStrategy: SupervisorStrategy) extends Actor {
|
||||
|
||||
def receive = {
|
||||
case p: Props ⇒ sender ! context.actorOf(p)
|
||||
case p: Props ⇒ sender() ! context.actorOf(p)
|
||||
}
|
||||
// test relies on keeping children around during restart
|
||||
override def preRestart(cause: Throwable, msg: Option[Any]) {}
|
||||
|
|
@ -48,9 +48,9 @@ object SupervisorHierarchySpec {
|
|||
class Resumer extends Actor {
|
||||
override def supervisorStrategy = OneForOneStrategy() { case _ ⇒ SupervisorStrategy.Resume }
|
||||
def receive = {
|
||||
case "spawn" ⇒ sender ! context.actorOf(Props[Resumer])
|
||||
case "spawn" ⇒ sender() ! context.actorOf(Props[Resumer])
|
||||
case "fail" ⇒ throw new Exception("expected")
|
||||
case "ping" ⇒ sender ! "pong"
|
||||
case "ping" ⇒ sender() ! "pong"
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -197,23 +197,23 @@ object SupervisorHierarchySpec {
|
|||
}
|
||||
override val supervisorStrategy = OneForOneStrategy()(unwrap andThen {
|
||||
case (_: Failure, _) if pongsToGo > 0 ⇒
|
||||
log :+= Event("pongOfDeath resuming " + sender, identityHashCode(this))
|
||||
log :+= Event("pongOfDeath resuming " + sender(), identityHashCode(this))
|
||||
Resume
|
||||
case (f: Failure, orig) ⇒
|
||||
if (f.depth > 0) {
|
||||
setFlags(f.directive)
|
||||
log :+= Event("escalating " + f + " from " + sender, identityHashCode(this))
|
||||
log :+= Event("escalating " + f + " from " + sender(), identityHashCode(this))
|
||||
throw f.copy(depth = f.depth - 1)
|
||||
}
|
||||
val prefix = orig match {
|
||||
case f: Failure ⇒ "applying "
|
||||
case _ ⇒ "re-applying "
|
||||
}
|
||||
log :+= Event(prefix + f + " to " + sender, identityHashCode(this))
|
||||
log :+= Event(prefix + f + " to " + sender(), identityHashCode(this))
|
||||
if (myLevel > 3 && f.failPost == 0 && f.stop) Stop else f.directive
|
||||
case (_, x) ⇒
|
||||
log :+= Event("unhandled exception from " + sender + Logging.stackTraceFor(x), identityHashCode(this))
|
||||
sender ! Dump(0)
|
||||
log :+= Event("unhandled exception from " + sender() + Logging.stackTraceFor(x), identityHashCode(this))
|
||||
sender() ! Dump(0)
|
||||
context.system.scheduler.scheduleOnce(1 second, self, Dump(0))(context.dispatcher)
|
||||
Resume
|
||||
})
|
||||
|
|
@ -276,7 +276,7 @@ object SupervisorHierarchySpec {
|
|||
setFlags(f.directive)
|
||||
stateCache.put(self.path, stateCache.get(self.path).copy(failConstr = f.copy()))
|
||||
throw f
|
||||
case "ping" ⇒ { Thread.sleep((random.nextFloat * 1.03).toLong); sender ! "pong" }
|
||||
case "ping" ⇒ { Thread.sleep((random.nextFloat * 1.03).toLong); sender() ! "pong" }
|
||||
case Dump(0) ⇒ abort("dump")
|
||||
case Dump(level) ⇒ context.children foreach (_ ! Dump(level - 1))
|
||||
case Terminated(ref) ⇒
|
||||
|
|
@ -296,7 +296,7 @@ object SupervisorHierarchySpec {
|
|||
} else {
|
||||
// WARNING: The Terminated that is logged by this is logged by check() above, too. It is not
|
||||
// an indication of duplicate Terminate messages
|
||||
log :+= Event(sender + " terminated while pongOfDeath", identityHashCode(Hierarchy.this))
|
||||
log :+= Event(sender() + " terminated while pongOfDeath", identityHashCode(Hierarchy.this))
|
||||
}
|
||||
case Abort ⇒ abort("terminating")
|
||||
case PingOfDeath ⇒
|
||||
|
|
@ -518,8 +518,8 @@ object SupervisorHierarchySpec {
|
|||
bury(path)
|
||||
stay
|
||||
case Event("pong", _) ⇒
|
||||
pingChildren -= sender
|
||||
idleChildren :+= sender
|
||||
pingChildren -= sender()
|
||||
idleChildren :+= sender()
|
||||
stay
|
||||
case Event(StateTimeout, todo) ⇒
|
||||
log.info("dumping state due to StateTimeout")
|
||||
|
|
@ -538,8 +538,8 @@ object SupervisorHierarchySpec {
|
|||
|
||||
when(Finishing) {
|
||||
case Event("pong", _) ⇒
|
||||
pingChildren -= sender
|
||||
idleChildren :+= sender
|
||||
pingChildren -= sender()
|
||||
idleChildren :+= sender()
|
||||
if (pingChildren.isEmpty) goto(LastPing) else stay
|
||||
case Event(Died(ref), _) ⇒
|
||||
bury(ref)
|
||||
|
|
@ -555,8 +555,8 @@ object SupervisorHierarchySpec {
|
|||
|
||||
when(LastPing) {
|
||||
case Event("pong", _) ⇒
|
||||
pingChildren -= sender
|
||||
idleChildren :+= sender
|
||||
pingChildren -= sender()
|
||||
idleChildren :+= sender()
|
||||
if (pingChildren.isEmpty) goto(Stopping) else stay
|
||||
case Event(Died(ref), _) ⇒
|
||||
bury(ref)
|
||||
|
|
@ -605,7 +605,7 @@ object SupervisorHierarchySpec {
|
|||
testActor ! "timeout in Stopping"
|
||||
stop
|
||||
case Event(e: ErrorLog, _) ⇒
|
||||
errors :+= sender -> e
|
||||
errors :+= sender() -> e
|
||||
goto(Failed)
|
||||
}
|
||||
|
||||
|
|
@ -631,7 +631,7 @@ object SupervisorHierarchySpec {
|
|||
when(Failed, stateTimeout = 5.seconds.dilated) {
|
||||
case Event(e: ErrorLog, _) ⇒
|
||||
if (!e.msg.startsWith("not resumed") || !ignoreNotResumedLogs)
|
||||
errors :+= sender -> e
|
||||
errors :+= sender() -> e
|
||||
stay
|
||||
case Event(Terminated(r), _) if r == hierarchy ⇒
|
||||
printErrors()
|
||||
|
|
@ -694,7 +694,7 @@ object SupervisorHierarchySpec {
|
|||
case Event(e: ErrorLog, _) ⇒
|
||||
if (e.msg.startsWith("not resumed")) stay
|
||||
else {
|
||||
errors :+= sender -> e
|
||||
errors :+= sender() -> e
|
||||
// don’t stop the hierarchy, that is going to happen all by itself and in the right order
|
||||
goto(Failed)
|
||||
}
|
||||
|
|
@ -794,7 +794,7 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w
|
|||
val slowResumer = system.actorOf(Props(new Actor {
|
||||
override def supervisorStrategy = OneForOneStrategy() { case _ ⇒ Await.ready(latch, 4.seconds.dilated); SupervisorStrategy.Resume }
|
||||
def receive = {
|
||||
case "spawn" ⇒ sender ! context.actorOf(Props[Resumer])
|
||||
case "spawn" ⇒ sender() ! context.actorOf(Props[Resumer])
|
||||
}
|
||||
}), "slowResumer")
|
||||
slowResumer ! "spawn"
|
||||
|
|
@ -848,7 +848,7 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w
|
|||
postRestartCalled.incrementAndGet()
|
||||
}
|
||||
override def receive = {
|
||||
case m ⇒ sender ! m
|
||||
case m ⇒ sender() ! m
|
||||
}
|
||||
}), "failChild")
|
||||
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with Defaul
|
|||
val workerProps = Props(new Actor {
|
||||
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
|
||||
def receive = {
|
||||
case "status" ⇒ this.sender ! "OK"
|
||||
case "status" ⇒ this.sender() ! "OK"
|
||||
case _ ⇒ this.context.stop(self)
|
||||
}
|
||||
})
|
||||
|
|
@ -146,7 +146,7 @@ class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with Defaul
|
|||
"have access to the failing child’s reference in supervisorStrategy" in {
|
||||
val parent = system.actorOf(Props(new Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy() {
|
||||
case _: Exception ⇒ testActor ! sender; SupervisorStrategy.Stop
|
||||
case _: Exception ⇒ testActor ! sender(); SupervisorStrategy.Stop
|
||||
}
|
||||
def receive = {
|
||||
case "doit" ⇒ context.actorOf(Props.empty, "child") ! Kill
|
||||
|
|
|
|||
|
|
@ -36,13 +36,13 @@ object SupervisorSpec {
|
|||
def receive = {
|
||||
case Ping ⇒
|
||||
sendTo ! PingMessage
|
||||
if (sender != sendTo)
|
||||
sender ! PongMessage
|
||||
if (sender() != sendTo)
|
||||
sender() ! PongMessage
|
||||
case Die ⇒
|
||||
throw new RuntimeException(ExceptionMessage)
|
||||
case DieReply ⇒
|
||||
val e = new RuntimeException(ExceptionMessage)
|
||||
sender ! Status.Failure(e)
|
||||
sender() ! Status.Failure(e)
|
||||
throw e
|
||||
}
|
||||
|
||||
|
|
@ -172,7 +172,7 @@ class SupervisorSpec extends AkkaSpec("akka.actor.serialize-messages = off") wit
|
|||
override def postStop() { postStops += 1; testActor ! ("postStop" + postStops) }
|
||||
def receive = {
|
||||
case "crash" ⇒ { testActor ! "crashed"; throw new RuntimeException("Expected") }
|
||||
case "ping" ⇒ sender ! "pong"
|
||||
case "ping" ⇒ sender() ! "pong"
|
||||
}
|
||||
}
|
||||
val master = system.actorOf(Props(new Actor {
|
||||
|
|
@ -349,10 +349,10 @@ class SupervisorSpec extends AkkaSpec("akka.actor.serialize-messages = off") wit
|
|||
}
|
||||
|
||||
def receive = {
|
||||
case Ping ⇒ sender ! PongMessage
|
||||
case Ping ⇒ sender() ! PongMessage
|
||||
case DieReply ⇒
|
||||
val e = new RuntimeException("Expected")
|
||||
sender ! Status.Failure(e)
|
||||
sender() ! Status.Failure(e)
|
||||
throw e
|
||||
}
|
||||
})
|
||||
|
|
@ -386,7 +386,7 @@ class SupervisorSpec extends AkkaSpec("akka.actor.serialize-messages = off") wit
|
|||
override def postRestart(reason: Throwable): Unit = testActor ! "child restarted"
|
||||
def receive = {
|
||||
case l: TestLatch ⇒ { Await.ready(l, 5 seconds); throw new IllegalStateException("OHNOES") }
|
||||
case "test" ⇒ sender ! "child green"
|
||||
case "test" ⇒ sender() ! "child green"
|
||||
}
|
||||
}), "child"))
|
||||
|
||||
|
|
@ -401,9 +401,9 @@ class SupervisorSpec extends AkkaSpec("akka.actor.serialize-messages = off") wit
|
|||
def receive = {
|
||||
case Terminated(a) if a.path == child.path ⇒ testActor ! "child terminated"
|
||||
case l: TestLatch ⇒ child ! l
|
||||
case "test" ⇒ sender ! "green"
|
||||
case "test" ⇒ sender() ! "green"
|
||||
case "testchild" ⇒ child forward "test"
|
||||
case "testchildAndAck" ⇒ child forward "test"; sender ! "ack"
|
||||
case "testchildAndAck" ⇒ child forward "test"; sender() ! "ack"
|
||||
}
|
||||
}))
|
||||
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ class SupervisorTreeSpec extends AkkaSpec("akka.actor.serialize-messages = off")
|
|||
val p = Props(new Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 second)(List(classOf[Exception]))
|
||||
def receive = {
|
||||
case p: Props ⇒ sender ! context.actorOf(p)
|
||||
case p: Props ⇒ sender() ! context.actorOf(p)
|
||||
}
|
||||
override def preRestart(cause: Throwable, msg: Option[Any]) { testActor ! self.path }
|
||||
})
|
||||
|
|
|
|||
|
|
@ -59,11 +59,11 @@ object Ticket669Spec {
|
|||
}
|
||||
|
||||
override def preRestart(reason: scala.Throwable, msg: Option[Any]) {
|
||||
sender ! "failure1"
|
||||
sender() ! "failure1"
|
||||
}
|
||||
|
||||
override def postStop() {
|
||||
sender ! "failure2"
|
||||
sender() ! "failure2"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -353,7 +353,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
|
|||
case e: IllegalStateException if e.getMessage == "expected" ⇒ SupervisorStrategy.Resume
|
||||
}
|
||||
def receive = {
|
||||
case p: TypedProps[_] ⇒ context.sender ! TypedActor(context).typedActorOf(p)
|
||||
case p: TypedProps[_] ⇒ context.sender() ! TypedActor(context).typedActorOf(p)
|
||||
}
|
||||
}))
|
||||
val t = Await.result((boss ? TypedProps[Bar](classOf[Foo], classOf[Bar]).withTimeout(2 seconds)).mapTo[Foo], timeout.duration)
|
||||
|
|
|
|||
|
|
@ -86,15 +86,15 @@ object ActorModelSpec {
|
|||
case Meet(sign, wait) ⇒ { ack(); sign.countDown(); wait.await(); busy.switchOff(()) }
|
||||
case Wait(time) ⇒ { ack(); Thread.sleep(time); busy.switchOff(()) }
|
||||
case WaitAck(time, l) ⇒ { ack(); Thread.sleep(time); l.countDown(); busy.switchOff(()) }
|
||||
case Reply(msg) ⇒ { ack(); sender ! msg; busy.switchOff(()) }
|
||||
case TryReply(msg) ⇒ { ack(); sender.tell(msg, null); busy.switchOff(()) }
|
||||
case Reply(msg) ⇒ { ack(); sender() ! msg; busy.switchOff(()) }
|
||||
case TryReply(msg) ⇒ { ack(); sender().tell(msg, null); busy.switchOff(()) }
|
||||
case Forward(to, msg) ⇒ { ack(); to.forward(msg); busy.switchOff(()) }
|
||||
case CountDown(latch) ⇒ { ack(); latch.countDown(); busy.switchOff(()) }
|
||||
case Increment(count) ⇒ { ack(); count.incrementAndGet(); busy.switchOff(()) }
|
||||
case CountDownNStop(l) ⇒ { ack(); l.countDown(); context.stop(self); busy.switchOff(()) }
|
||||
case Restart ⇒ { ack(); busy.switchOff(()); throw new Exception("Restart requested") }
|
||||
case Interrupt ⇒ { ack(); sender ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(()); throw new InterruptedException("Ping!") }
|
||||
case InterruptNicely(msg) ⇒ { ack(); sender ! msg; busy.switchOff(()); Thread.currentThread().interrupt() }
|
||||
case Interrupt ⇒ { ack(); sender() ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(()); throw new InterruptedException("Ping!") }
|
||||
case InterruptNicely(msg) ⇒ { ack(); sender() ! msg; busy.switchOff(()); Thread.currentThread().interrupt() }
|
||||
case ThrowException(e: Throwable) ⇒ { ack(); busy.switchOff(()); throw e }
|
||||
case DoubleStop ⇒ { ack(); context.stop(self); context.stop(self); busy.switchOff }
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ object DispatcherActorSpec {
|
|||
"""
|
||||
class TestActor extends Actor {
|
||||
def receive = {
|
||||
case "Hello" ⇒ sender ! "World"
|
||||
case "Hello" ⇒ sender() ! "World"
|
||||
case "Failure" ⇒ throw new RuntimeException("Expected exception; to test fault-tolerance")
|
||||
}
|
||||
}
|
||||
|
|
@ -86,7 +86,7 @@ class DispatcherActorSpec extends AkkaSpec(DispatcherActorSpec.config) with Defa
|
|||
val slowOne = system.actorOf(
|
||||
Props(new Actor {
|
||||
def receive = {
|
||||
case "hogexecutor" ⇒ { sender ! "OK"; start.await }
|
||||
case "hogexecutor" ⇒ { sender() ! "OK"; start.await }
|
||||
case "ping" ⇒ if (works.get) latch.countDown()
|
||||
}
|
||||
}).withDispatcher(throughputDispatcher))
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ object DispatchersSpec {
|
|||
|
||||
class ThreadNameEcho extends Actor {
|
||||
def receive = {
|
||||
case _ ⇒ sender ! Thread.currentThread.getName
|
||||
case _ ⇒ sender() ! Thread.currentThread.getName
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ object PinnedActorSpec {
|
|||
|
||||
class TestActor extends Actor {
|
||||
def receive = {
|
||||
case "Hello" ⇒ sender ! "World"
|
||||
case "Hello" ⇒ sender() ! "World"
|
||||
case "Failure" ⇒ throw new RuntimeException("Expected exception; to test fault-tolerance")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,8 +39,8 @@ class Future2ActorSpec extends AkkaSpec with DefaultTimeout {
|
|||
"support reply via sender" in {
|
||||
val actor = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case "do" ⇒ Future(31) pipeTo context.sender
|
||||
case "ex" ⇒ Future(throw new AssertionError) pipeTo context.sender
|
||||
case "do" ⇒ Future(31) pipeTo context.sender()
|
||||
case "ex" ⇒ Future(throw new AssertionError) pipeTo context.sender()
|
||||
}
|
||||
}))
|
||||
Await.result(actor ? "do", timeout.duration) should be(31)
|
||||
|
|
|
|||
|
|
@ -31,9 +31,9 @@ object FutureSpec {
|
|||
|
||||
class TestActor extends Actor {
|
||||
def receive = {
|
||||
case "Hello" ⇒ sender ! "World"
|
||||
case "Hello" ⇒ sender() ! "World"
|
||||
case "Failure" ⇒
|
||||
sender ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance"))
|
||||
sender() ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance"))
|
||||
case "NoReply" ⇒
|
||||
}
|
||||
}
|
||||
|
|
@ -41,11 +41,11 @@ object FutureSpec {
|
|||
class TestDelayActor(await: TestLatch) extends Actor {
|
||||
def receive = {
|
||||
case "Hello" ⇒
|
||||
FutureSpec.ready(await, TestLatch.DefaultTimeout); sender ! "World"
|
||||
FutureSpec.ready(await, TestLatch.DefaultTimeout); sender() ! "World"
|
||||
case "NoReply" ⇒ FutureSpec.ready(await, TestLatch.DefaultTimeout)
|
||||
case "Failure" ⇒
|
||||
FutureSpec.ready(await, TestLatch.DefaultTimeout)
|
||||
sender ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance"))
|
||||
sender() ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance"))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -200,7 +200,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
"will return a result" must {
|
||||
behave like futureWithResult { test ⇒
|
||||
val actor1 = system.actorOf(Props[TestActor])
|
||||
val actor2 = system.actorOf(Props(new Actor { def receive = { case s: String ⇒ sender ! s.toUpperCase } }))
|
||||
val actor2 = system.actorOf(Props(new Actor { def receive = { case s: String ⇒ sender() ! s.toUpperCase } }))
|
||||
val future = actor1 ? "Hello" flatMap { case s: String ⇒ actor2 ? s }
|
||||
FutureSpec.ready(future, timeout.duration)
|
||||
test(future, "WORLD")
|
||||
|
|
@ -212,7 +212,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
behave like futureWithException[ArithmeticException] { test ⇒
|
||||
filterException[ArithmeticException] {
|
||||
val actor1 = system.actorOf(Props[TestActor])
|
||||
val actor2 = system.actorOf(Props(new Actor { def receive = { case s: String ⇒ sender ! Status.Failure(new ArithmeticException("/ by zero")) } }))
|
||||
val actor2 = system.actorOf(Props(new Actor { def receive = { case s: String ⇒ sender() ! Status.Failure(new ArithmeticException("/ by zero")) } }))
|
||||
val future = actor1 ? "Hello" flatMap { case s: String ⇒ actor2 ? s }
|
||||
FutureSpec.ready(future, timeout.duration)
|
||||
test(future, "/ by zero")
|
||||
|
|
@ -225,7 +225,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
behave like futureWithException[NoSuchElementException] { test ⇒
|
||||
filterException[NoSuchElementException] {
|
||||
val actor1 = system.actorOf(Props[TestActor])
|
||||
val actor2 = system.actorOf(Props(new Actor { def receive = { case s: String ⇒ sender ! s.toUpperCase } }))
|
||||
val actor2 = system.actorOf(Props(new Actor { def receive = { case s: String ⇒ sender() ! s.toUpperCase } }))
|
||||
val future = actor1 ? "Hello" flatMap { case i: Int ⇒ actor2 ? i }
|
||||
FutureSpec.ready(future, timeout.duration)
|
||||
test(future, "World (of class java.lang.String)")
|
||||
|
|
@ -242,8 +242,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
filterException[ClassCastException] {
|
||||
val actor = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case s: String ⇒ sender ! s.length
|
||||
case i: Int ⇒ sender ! (i * 2).toString
|
||||
case s: String ⇒ sender() ! s.length
|
||||
case i: Int ⇒ sender() ! (i * 2).toString
|
||||
}
|
||||
}))
|
||||
|
||||
|
|
@ -272,8 +272,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
filterException[NoSuchElementException] {
|
||||
val actor = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case Req(s: String) ⇒ sender ! Res(s.length)
|
||||
case Req(i: Int) ⇒ sender ! Res((i * 2).toString)
|
||||
case Req(s: String) ⇒ sender() ! Res(s.length)
|
||||
case Req(i: Int) ⇒ sender() ! Res((i * 2).toString)
|
||||
}
|
||||
}))
|
||||
|
||||
|
|
@ -474,7 +474,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
var counter = 1
|
||||
def receive = {
|
||||
case 'GetNext ⇒
|
||||
sender ! counter
|
||||
sender() ! counter
|
||||
counter += 2
|
||||
}
|
||||
}))
|
||||
|
|
|
|||
|
|
@ -260,7 +260,7 @@ class SingleConsumerOnlyMailboxVerificationSpec extends AkkaSpec(SingleConsumerO
|
|||
def receive = {
|
||||
case Ping ⇒
|
||||
n -= 1
|
||||
sender ! Ping
|
||||
sender() ! Ping
|
||||
if (n == 0)
|
||||
context stop self
|
||||
}
|
||||
|
|
|
|||
|
|
@ -66,7 +66,7 @@ class PriorityDispatcherSpec extends AkkaSpec(PriorityDispatcherSpec.config) wit
|
|||
|
||||
def receive = {
|
||||
case i: Int ⇒ acc += i
|
||||
case 'Result ⇒ sender ! acc.toList
|
||||
case 'Result ⇒ sender() ! acc.toList
|
||||
}
|
||||
}).withDispatcher(dispatcherKey))
|
||||
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ object EventStreamSpec {
|
|||
case Logging.InitializeLogger(bus) ⇒
|
||||
bus.subscribe(context.self, classOf[SetTarget])
|
||||
bus.subscribe(context.self, classOf[UnhandledMessage])
|
||||
sender ! Logging.LoggerInitialized
|
||||
sender() ! Logging.LoggerInitialized
|
||||
case SetTarget(ref) ⇒ { dst = ref; dst ! "OK" }
|
||||
case e: Logging.LogEvent ⇒ dst ! e
|
||||
case u: UnhandledMessage ⇒ dst ! u
|
||||
|
|
|
|||
|
|
@ -78,7 +78,7 @@ object LoggerSpec {
|
|||
override def receive: Receive = {
|
||||
case InitializeLogger(bus) ⇒
|
||||
bus.subscribe(context.self, classOf[SetTarget])
|
||||
sender ! LoggerInitialized
|
||||
sender() ! LoggerInitialized
|
||||
case SetTarget(ref, `qualifier`) ⇒
|
||||
target = Some(ref)
|
||||
ref ! ("OK")
|
||||
|
|
|
|||
|
|
@ -82,7 +82,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
|
|||
val actor = TestActorRef(new Actor {
|
||||
def switch: Actor.Receive = { case "becomenull" ⇒ context.become(r, false) }
|
||||
def receive = switch orElse LoggingReceive {
|
||||
case x ⇒ sender ! "x"
|
||||
case x ⇒ sender() ! "x"
|
||||
}
|
||||
})
|
||||
|
||||
|
|
@ -110,7 +110,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
|
|||
system.eventStream.subscribe(testActor, classOf[Logging.Debug])
|
||||
val actor = TestActorRef(new Actor {
|
||||
def receive = LoggingReceive(LoggingReceive {
|
||||
case _ ⇒ sender ! "x"
|
||||
case _ ⇒ sender() ! "x"
|
||||
})
|
||||
})
|
||||
actor ! "buh"
|
||||
|
|
|
|||
|
|
@ -31,13 +31,13 @@ trait TcpIntegrationSpecSupport { _: AkkaSpec ⇒
|
|||
connectCommander.send(IO(Tcp), Connect(endpoint, options = connectOptions))
|
||||
val Connected(`endpoint`, localAddress) = connectCommander.expectMsgType[Connected]
|
||||
val clientHandler = TestProbe()
|
||||
connectCommander.sender ! Register(clientHandler.ref)
|
||||
connectCommander.sender() ! Register(clientHandler.ref)
|
||||
|
||||
val Connected(`localAddress`, `endpoint`) = bindHandler.expectMsgType[Connected]
|
||||
val serverHandler = TestProbe()
|
||||
bindHandler.sender ! Register(serverHandler.ref)
|
||||
bindHandler.sender() ! Register(serverHandler.ref)
|
||||
|
||||
(clientHandler, connectCommander.sender, serverHandler, bindHandler.sender)
|
||||
(clientHandler, connectCommander.sender(), serverHandler, bindHandler.sender())
|
||||
}
|
||||
|
||||
@tailrec final def expectReceivedData(handler: TestProbe, remaining: Int): Unit =
|
||||
|
|
|
|||
|
|
@ -20,14 +20,14 @@ class UdpConnectedIntegrationSpec extends AkkaSpec("""
|
|||
val commander = TestProbe()
|
||||
commander.send(IO(Udp), Udp.Bind(handler, address))
|
||||
commander.expectMsg(Udp.Bound(address))
|
||||
commander.sender
|
||||
commander.sender()
|
||||
}
|
||||
|
||||
def connectUdp(localAddress: Option[InetSocketAddress], remoteAddress: InetSocketAddress, handler: ActorRef): ActorRef = {
|
||||
val commander = TestProbe()
|
||||
commander.send(IO(UdpConnected), UdpConnected.Connect(handler, remoteAddress, localAddress, Nil))
|
||||
commander.expectMsg(UdpConnected.Connected)
|
||||
commander.sender
|
||||
commander.sender()
|
||||
}
|
||||
|
||||
"The UDP connection oriented implementation" must {
|
||||
|
|
|
|||
|
|
@ -20,14 +20,14 @@ class UdpIntegrationSpec extends AkkaSpec("""
|
|||
val commander = TestProbe()
|
||||
commander.send(IO(Udp), Bind(handler, address))
|
||||
commander.expectMsg(Bound(address))
|
||||
commander.sender
|
||||
commander.sender()
|
||||
}
|
||||
|
||||
val simpleSender: ActorRef = {
|
||||
val commander = TestProbe()
|
||||
commander.send(IO(Udp), SimpleSender)
|
||||
commander.expectMsg(SimpleSenderReady)
|
||||
commander.sender
|
||||
commander.sender()
|
||||
}
|
||||
|
||||
"The UDP Fire-and-Forget implementation" must {
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con
|
|||
"be picked up from Props" in {
|
||||
val actor = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case "get" ⇒ sender ! context.props
|
||||
case "get" ⇒ sender() ! context.props
|
||||
}
|
||||
}).withRouter(RoundRobinRouter(12)), "someOther")
|
||||
routerConfig(actor) should equal(RoundRobinRouter(12))
|
||||
|
|
@ -72,7 +72,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con
|
|||
"be overridable in config" in {
|
||||
val actor = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case "get" ⇒ sender ! context.props
|
||||
case "get" ⇒ sender() ! context.props
|
||||
}
|
||||
}).withRouter(RoundRobinRouter(12)), "config")
|
||||
routerConfig(actor) should equal(RandomPool(4))
|
||||
|
|
@ -82,7 +82,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con
|
|||
"be overridable in explicit deployment" in {
|
||||
val actor = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case "get" ⇒ sender ! context.props
|
||||
case "get" ⇒ sender() ! context.props
|
||||
}
|
||||
}).withRouter(FromConfig).withDeploy(Deploy(routerConfig = RoundRobinRouter(12))), "someOther")
|
||||
routerConfig(actor) should equal(RoundRobinRouter(12))
|
||||
|
|
@ -92,7 +92,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con
|
|||
"be overridable in config even with explicit deployment" in {
|
||||
val actor = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case "get" ⇒ sender ! context.props
|
||||
case "get" ⇒ sender() ! context.props
|
||||
}
|
||||
}).withRouter(FromConfig).withDeploy(Deploy(routerConfig = RoundRobinRouter(12))), "config")
|
||||
routerConfig(actor) should equal(RandomPool(4))
|
||||
|
|
@ -159,7 +159,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con
|
|||
val actor = system.actorOf(Props(new Actor {
|
||||
lazy val id = counter.getAndIncrement()
|
||||
def receive = {
|
||||
case "hit" ⇒ sender ! id
|
||||
case "hit" ⇒ sender() ! id
|
||||
case "end" ⇒ doneLatch.countDown()
|
||||
}
|
||||
}).withRouter(RoundRobinRouter(connectionCount)), "round-robin")
|
||||
|
|
@ -208,7 +208,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con
|
|||
|
||||
val actor = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case "hello" ⇒ sender ! "world"
|
||||
case "hello" ⇒ sender() ! "world"
|
||||
}
|
||||
|
||||
override def postStop() {
|
||||
|
|
@ -244,7 +244,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con
|
|||
val actor = system.actorOf(Props(new Actor {
|
||||
lazy val id = counter.getAndIncrement()
|
||||
def receive = {
|
||||
case "hit" ⇒ sender ! id
|
||||
case "hit" ⇒ sender() ! id
|
||||
case "end" ⇒ doneLatch.countDown()
|
||||
}
|
||||
}).withRouter(RandomRouter(connectionCount)), "random")
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ object ConsistentHashingRouterSpec {
|
|||
|
||||
class Echo extends Actor {
|
||||
def receive = {
|
||||
case _ ⇒ sender ! self
|
||||
case _ ⇒ sender() ! self
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -144,8 +144,8 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
|
|||
val router = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case d: FiniteDuration ⇒
|
||||
Thread.sleep(d.dilated.toMillis); sender ! "done"
|
||||
case "echo" ⇒ sender ! "reply"
|
||||
Thread.sleep(d.dilated.toMillis); sender() ! "done"
|
||||
case "echo" ⇒ sender() ! "reply"
|
||||
}
|
||||
}).withRouter(RoundRobinRouter(resizer = Some(resizer))))
|
||||
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ object RoutingSpec {
|
|||
|
||||
class Echo extends Actor {
|
||||
def receive = {
|
||||
case _ ⇒ sender ! self
|
||||
case _ ⇒ sender() ! self
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -224,8 +224,8 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
|
|||
def receive = {
|
||||
case "start" ⇒
|
||||
context.actorOf(Props(new Actor {
|
||||
def receive = { case x ⇒ sender ! x }
|
||||
}).withRouter(RoundRobinRouter(2))) ? "hello" pipeTo sender
|
||||
def receive = { case x ⇒ sender() ! x }
|
||||
}).withRouter(RoundRobinRouter(2))) ? "hello" pipeTo sender()
|
||||
}
|
||||
})) ! "start"
|
||||
expectMsg("hello")
|
||||
|
|
@ -470,7 +470,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
|
|||
case "end" ⇒ doneLatch.countDown()
|
||||
case msg: Int ⇒
|
||||
counter1.addAndGet(msg)
|
||||
sender ! "ack"
|
||||
sender() ! "ack"
|
||||
}
|
||||
}))
|
||||
|
||||
|
|
@ -552,7 +552,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
|
|||
case _id: Int if (_id == id) ⇒
|
||||
case x ⇒ {
|
||||
Thread sleep 100 * id
|
||||
sender ! id
|
||||
sender() ! id
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ class AskSpec extends AkkaSpec {
|
|||
|
||||
"return broken promises on 0 timeout" in {
|
||||
implicit val timeout = Timeout(0 seconds)
|
||||
val echo = system.actorOf(Props(new Actor { def receive = { case x ⇒ sender ! x } }))
|
||||
val echo = system.actorOf(Props(new Actor { def receive = { case x ⇒ sender() ! x } }))
|
||||
val f = echo ? "foo"
|
||||
val expectedMsg = "Timeout length must not be negative, question not sent to [%s]" format echo
|
||||
intercept[IllegalArgumentException] {
|
||||
|
|
@ -59,7 +59,7 @@ class AskSpec extends AkkaSpec {
|
|||
|
||||
"return broken promises on < 0 timeout" in {
|
||||
implicit val timeout = Timeout(-1000 seconds)
|
||||
val echo = system.actorOf(Props(new Actor { def receive = { case x ⇒ sender ! x } }))
|
||||
val echo = system.actorOf(Props(new Actor { def receive = { case x ⇒ sender() ! x } }))
|
||||
val f = echo ? "foo"
|
||||
val expectedMsg = "Timeout length must not be negative, question not sent to [%s]" format echo
|
||||
intercept[IllegalArgumentException] {
|
||||
|
|
@ -87,7 +87,7 @@ class AskSpec extends AkkaSpec {
|
|||
"work for ActorSelection" in {
|
||||
implicit val timeout = Timeout(5 seconds)
|
||||
import system.dispatcher
|
||||
val echo = system.actorOf(Props(new Actor { def receive = { case x ⇒ sender ! x } }), "select-echo")
|
||||
val echo = system.actorOf(Props(new Actor { def receive = { case x ⇒ sender() ! x } }), "select-echo")
|
||||
val identityFuture = (system.actorSelection("/user/select-echo") ? Identify(None))
|
||||
.mapTo[ActorIdentity].map(_.ref.get)
|
||||
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ class PipeToSpec extends AkkaSpec {
|
|||
p.expectMsgType[Status.Failure].cause.getMessage should be("failed")
|
||||
}
|
||||
|
||||
"pick up an implicit sender" in {
|
||||
"pick up an implicit sender()" in {
|
||||
val p = TestProbe()
|
||||
implicit val s = testActor
|
||||
Future(42) pipeTo p.ref
|
||||
|
|
@ -41,7 +41,7 @@ class PipeToSpec extends AkkaSpec {
|
|||
p.expectMsg(42)
|
||||
}
|
||||
|
||||
"work in Java form with sender" in {
|
||||
"work in Java form with sender()" in {
|
||||
val p = TestProbe()
|
||||
pipe(Future(42)) to (p.ref, testActor)
|
||||
p.expectMsg(42)
|
||||
|
|
@ -66,7 +66,7 @@ class PipeToSpec extends AkkaSpec {
|
|||
p.expectMsgType[Status.Failure].cause.getMessage should be("failed")
|
||||
}
|
||||
|
||||
"pick up an implicit sender" in {
|
||||
"pick up an implicit sender()" in {
|
||||
val p = TestProbe()
|
||||
val sel = system.actorSelection(p.ref.path)
|
||||
implicit val s = testActor
|
||||
|
|
@ -82,7 +82,7 @@ class PipeToSpec extends AkkaSpec {
|
|||
p.expectMsg(42)
|
||||
}
|
||||
|
||||
"work in Java form with sender" in {
|
||||
"work in Java form with sender()" in {
|
||||
val p = TestProbe()
|
||||
val sel = system.actorSelection(p.ref.path)
|
||||
pipe(Future(42)) to (sel, testActor)
|
||||
|
|
|
|||
|
|
@ -93,7 +93,7 @@ object TellLatencyPerformanceSpec {
|
|||
|
||||
class Destination extends Actor {
|
||||
def receive = {
|
||||
case msg: Msg ⇒ sender ! msg
|
||||
case msg: Msg ⇒ sender() ! msg
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -154,7 +154,7 @@ object TellThroughputComputationPerformanceSpec {
|
|||
def receive = {
|
||||
case Msg ⇒
|
||||
calculatePi()
|
||||
sender ! Msg
|
||||
sender() ! Msg
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -141,13 +141,13 @@ object TellThroughputPerformanceSpec {
|
|||
|
||||
class Destination1 extends Actor {
|
||||
def receive = {
|
||||
case Msg ⇒ sender ! Msg
|
||||
case Msg ⇒ sender() ! Msg
|
||||
}
|
||||
}
|
||||
|
||||
class Destination2 extends Actor {
|
||||
def receive = {
|
||||
case Msg ⇒ sender ! Msg
|
||||
case Msg ⇒ sender() ! Msg
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ class AkkaMatchingEngine(val meId: String, val orderbooks: List[Orderbook])
|
|||
|
||||
def done(status: Boolean, order: Order) {
|
||||
if (standby.isEmpty) {
|
||||
sender ! Rsp(order, status)
|
||||
sender() ! Rsp(order, status)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ class AkkaOrderReceiver extends Actor with OrderReceiver with ActorLogging {
|
|||
m forward order
|
||||
case None ⇒
|
||||
log.warning("Unknown orderbook: " + order.orderbookSymbol)
|
||||
sender ! Rsp(order, false)
|
||||
sender() ! Rsp(order, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ class BroadcastSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
|
|||
case "end" ⇒ doneLatch.countDown()
|
||||
case msg: Int ⇒
|
||||
counter1.addAndGet(msg)
|
||||
sender ! "ack"
|
||||
sender() ! "ack"
|
||||
}
|
||||
}))
|
||||
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ object ConfiguredLocalRoutingSpec {
|
|||
|
||||
class EchoProps extends Actor {
|
||||
def receive = {
|
||||
case "get" ⇒ sender ! context.props
|
||||
case "get" ⇒ sender() ! context.props
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -32,8 +32,8 @@ object ConsistentHashingRouterSpec {
|
|||
|
||||
class Echo extends Actor {
|
||||
def receive = {
|
||||
case x: ConsistentHashableEnvelope ⇒ sender ! s"Unexpected envelope: $x"
|
||||
case _ ⇒ sender ! self
|
||||
case x: ConsistentHashableEnvelope ⇒ sender() ! s"Unexpected envelope: $x"
|
||||
case _ ⇒ sender() ! self
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ class RandomSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
|
|||
|
||||
val actor = system.actorOf(RandomPool(7).props(Props(new Actor {
|
||||
def receive = {
|
||||
case "hello" ⇒ sender ! "world"
|
||||
case "hello" ⇒ sender() ! "world"
|
||||
}
|
||||
|
||||
override def postStop() {
|
||||
|
|
@ -58,7 +58,7 @@ class RandomSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
|
|||
Props(new Actor {
|
||||
lazy val id = counter.getAndIncrement()
|
||||
def receive = {
|
||||
case "hit" ⇒ sender ! id
|
||||
case "hit" ⇒ sender() ! id
|
||||
case "end" ⇒ doneLatch.countDown()
|
||||
}
|
||||
})), name = "random")
|
||||
|
|
|
|||
|
|
@ -144,8 +144,8 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
|
|||
Props(new Actor {
|
||||
def receive = {
|
||||
case d: FiniteDuration ⇒
|
||||
Thread.sleep(d.dilated.toMillis); sender ! "done"
|
||||
case "echo" ⇒ sender ! "reply"
|
||||
Thread.sleep(d.dilated.toMillis); sender() ! "done"
|
||||
case "echo" ⇒ sender() ! "reply"
|
||||
}
|
||||
})))
|
||||
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ class RoundRobinSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
|
|||
val actor = system.actorOf(RoundRobinPool(connectionCount).props(routeeProps = Props(new Actor {
|
||||
lazy val id = counter.getAndIncrement()
|
||||
def receive = {
|
||||
case "hit" ⇒ sender ! id
|
||||
case "hit" ⇒ sender() ! id
|
||||
case "end" ⇒ doneLatch.countDown()
|
||||
}
|
||||
})), "round-robin")
|
||||
|
|
@ -128,7 +128,7 @@ class RoundRobinSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
|
|||
val paths = (1 to connectionCount) map { n ⇒
|
||||
val ref = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case "hit" ⇒ sender ! self.path.name
|
||||
case "hit" ⇒ sender() ! self.path.name
|
||||
case "end" ⇒ doneLatch.countDown()
|
||||
}
|
||||
}), name = "target-" + n)
|
||||
|
|
@ -170,13 +170,13 @@ class RoundRobinSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
|
|||
router = router.removeRoutee(c)
|
||||
if (router.routees.isEmpty)
|
||||
context.stop(self)
|
||||
case other ⇒ router.route(other, sender)
|
||||
case other ⇒ router.route(other, sender())
|
||||
}
|
||||
}))
|
||||
|
||||
val childProps = Props(new Actor {
|
||||
def receive = {
|
||||
case "hit" ⇒ sender ! self.path.name
|
||||
case "hit" ⇒ sender() ! self.path.name
|
||||
case "end" ⇒ context.stop(self)
|
||||
}
|
||||
})
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ object RoutingSpec {
|
|||
|
||||
class Echo extends Actor {
|
||||
def receive = {
|
||||
case _ ⇒ sender ! self
|
||||
case _ ⇒ sender() ! self
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -202,8 +202,8 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
|
|||
def receive = {
|
||||
case "start" ⇒
|
||||
context.actorOf(RoundRobinPool(2).props(routeeProps = Props(new Actor {
|
||||
def receive = { case x ⇒ sender ! x }
|
||||
}))) ? "hello" pipeTo sender
|
||||
def receive = { case x ⇒ sender() ! x }
|
||||
}))) ? "hello" pipeTo sender()
|
||||
}
|
||||
})) ! "start"
|
||||
expectMsg("hello")
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ object ScatterGatherFirstCompletedSpec {
|
|||
case _id: Int if (_id == id) ⇒
|
||||
case x ⇒ {
|
||||
Thread sleep 100 * id
|
||||
sender ! id
|
||||
sender() ! id
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ object SerializationTests {
|
|||
|
||||
class FooActor extends Actor {
|
||||
def receive = {
|
||||
case s: String ⇒ sender ! s
|
||||
case s: String ⇒ sender() ! s
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -93,7 +93,7 @@ object SerializationTests {
|
|||
|
||||
class NonSerializableActor(system: ActorSystem) extends Actor {
|
||||
def receive = {
|
||||
case s: String ⇒ sender ! s
|
||||
case s: String ⇒ sender() ! s
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -356,7 +356,7 @@ object Actor {
|
|||
* - ''SHUTDOWN'' (when 'stop' is invoked) - can't do anything
|
||||
*
|
||||
* The Actor's own [[akka.actor.ActorRef]] is available as `self`, the current
|
||||
* message’s sender as `sender` and the [[akka.actor.ActorContext]] as
|
||||
* message’s sender as `sender()` and the [[akka.actor.ActorContext]] as
|
||||
* `context`. The only abstract method is `receive` which shall return the
|
||||
* initial behavior of the actor as a partial function (behavior can be changed
|
||||
* using `context.become` and `context.unbecome`).
|
||||
|
|
@ -375,23 +375,23 @@ object Actor {
|
|||
*
|
||||
* def receive = {
|
||||
* // directly calculated reply
|
||||
* case Request(r) => sender ! calculate(r)
|
||||
* case Request(r) => sender() ! calculate(r)
|
||||
*
|
||||
* // just to demonstrate how to stop yourself
|
||||
* case Shutdown => context.stop(self)
|
||||
*
|
||||
* // error kernel with child replying directly to 'sender'
|
||||
* case Dangerous(r) => context.actorOf(Props[ReplyToOriginWorker]).tell(PerformWork(r), sender)
|
||||
* // error kernel with child replying directly to 'sender()'
|
||||
* case Dangerous(r) => context.actorOf(Props[ReplyToOriginWorker]).tell(PerformWork(r), sender())
|
||||
*
|
||||
* // error kernel with reply going through us
|
||||
* case OtherJob(r) => context.actorOf(Props[ReplyToMeWorker]) ! JobRequest(r, sender)
|
||||
* case OtherJob(r) => context.actorOf(Props[ReplyToMeWorker]) ! JobRequest(r, sender())
|
||||
* case JobReply(result, orig_s) => orig_s ! result
|
||||
* }
|
||||
* }
|
||||
* }}}
|
||||
*
|
||||
* The last line demonstrates the essence of the error kernel design: spawn
|
||||
* one-off actors which terminate after doing their job, pass on `sender` to
|
||||
* one-off actors which terminate after doing their job, pass on `sender()` to
|
||||
* allow direct reply if that is what makes sense, or round-trip the sender
|
||||
* as shown with the fictitious JobRequest/JobReply message pair.
|
||||
*
|
||||
|
|
@ -446,7 +446,7 @@ trait Actor {
|
|||
* WARNING: Only valid within the Actor itself, so do not close over it and
|
||||
* publish it to other threads!
|
||||
*/
|
||||
final def sender: ActorRef = context.sender
|
||||
final def sender(): ActorRef = context.sender()
|
||||
|
||||
/**
|
||||
* This defines the initial actor behavior, it must return a partial function
|
||||
|
|
@ -561,7 +561,7 @@ trait Actor {
|
|||
def unhandled(message: Any): Unit = {
|
||||
message match {
|
||||
case Terminated(dead) ⇒ throw new DeathPactException(dead)
|
||||
case _ ⇒ context.system.eventStream.publish(UnhandledMessage(message, sender, self))
|
||||
case _ ⇒ context.system.eventStream.publish(UnhandledMessage(message, sender(), self))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ trait ActorContext extends ActorRefFactory {
|
|||
/**
|
||||
* Returns the sender 'ActorRef' of the current message.
|
||||
*/
|
||||
def sender: ActorRef
|
||||
def sender(): ActorRef
|
||||
|
||||
/**
|
||||
* Returns all supervised children; this method returns a view (i.e. a lazy
|
||||
|
|
@ -478,15 +478,15 @@ private[akka] class ActorCell(
|
|||
case Kill ⇒ throw new ActorKilledException("Kill")
|
||||
case PoisonPill ⇒ self.stop()
|
||||
case sel: ActorSelectionMessage ⇒ receiveSelection(sel)
|
||||
case Identify(messageId) ⇒ sender ! ActorIdentity(messageId, Some(self))
|
||||
case Identify(messageId) ⇒ sender() ! ActorIdentity(messageId, Some(self))
|
||||
}
|
||||
}
|
||||
|
||||
private def receiveSelection(sel: ActorSelectionMessage): Unit =
|
||||
if (sel.elements.isEmpty)
|
||||
invoke(Envelope(sel.msg, sender, system))
|
||||
invoke(Envelope(sel.msg, sender(), system))
|
||||
else
|
||||
ActorSelection.deliverSelection(self, sender, sel)
|
||||
ActorSelection.deliverSelection(self, sender(), sel)
|
||||
|
||||
final def receiveMessage(msg: Any): Unit = actor.aroundReceive(behaviorStack.head, msg)
|
||||
|
||||
|
|
@ -494,7 +494,7 @@ private[akka] class ActorCell(
|
|||
* ACTOR CONTEXT IMPLEMENTATION
|
||||
*/
|
||||
|
||||
final def sender: ActorRef = currentMessage match {
|
||||
final def sender(): ActorRef = currentMessage match {
|
||||
case null ⇒ system.deadLetters
|
||||
case msg if msg.sender ne null ⇒ msg.sender
|
||||
case _ ⇒ system.deadLetters
|
||||
|
|
|
|||
|
|
@ -90,7 +90,7 @@ object ActorDSL extends dsl.Inbox with dsl.Creators {
|
|||
|
||||
val boss = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(
|
||||
new Actor {
|
||||
def receive = { case any ⇒ sender ! any }
|
||||
def receive = { case any ⇒ sender() ! any }
|
||||
}), "dsl").asInstanceOf[RepointableActorRef]
|
||||
|
||||
{
|
||||
|
|
|
|||
|
|
@ -45,10 +45,10 @@ object ActorRef {
|
|||
*
|
||||
* def receive {
|
||||
* case Request1(msg) => other ! refine(msg) // uses this actor as sender reference, reply goes to us
|
||||
* case Request2(msg) => other.tell(msg, sender) // forward sender reference, enabling direct reply
|
||||
* case Request2(msg) => other.tell(msg, sender()) // forward sender reference, enabling direct reply
|
||||
* case Request3(msg) =>
|
||||
* implicit val timeout = Timeout(5.seconds)
|
||||
* sender ! (other ? msg) // will reply with a Future for holding other's reply
|
||||
* sender() ! (other ? msg) // will reply with a Future for holding other's reply
|
||||
* }
|
||||
* }
|
||||
* }}}
|
||||
|
|
@ -127,7 +127,7 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
|
|||
*
|
||||
* Works, no matter whether originally sent with tell/'!' or ask/'?'.
|
||||
*/
|
||||
def forward(message: Any)(implicit context: ActorContext) = tell(message, context.sender)
|
||||
def forward(message: Any)(implicit context: ActorContext) = tell(message, context.sender())
|
||||
|
||||
/**
|
||||
* Is the actor shut down?
|
||||
|
|
@ -168,7 +168,7 @@ trait ScalaActorRef { ref: ActorRef ⇒
|
|||
* If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument.
|
||||
* <p/>
|
||||
*
|
||||
* This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable,
|
||||
* This actor 'sender' reference is then available in the receiving actor in the 'sender()' member variable,
|
||||
* if invoked from within an Actor. If not then no sender is available.
|
||||
* <pre>
|
||||
* actor ! message
|
||||
|
|
|
|||
|
|
@ -370,7 +370,7 @@ private[akka] object LocalActorRefProvider {
|
|||
def receive = {
|
||||
case Terminated(_) ⇒ context.stop(self)
|
||||
case StopChild(child) ⇒ context.stop(child)
|
||||
case m ⇒ context.system.deadLetters forward DeadLetter(m, sender, self)
|
||||
case m ⇒ context.system.deadLetters forward DeadLetter(m, sender(), self)
|
||||
}
|
||||
|
||||
// guardian MUST NOT lose its children during restart
|
||||
|
|
@ -399,16 +399,16 @@ private[akka] object LocalActorRefProvider {
|
|||
// termination process of guardian has started
|
||||
terminationHooks -= a
|
||||
case StopChild(child) ⇒ context.stop(child)
|
||||
case RegisterTerminationHook if sender != context.system.deadLetters ⇒
|
||||
terminationHooks += sender
|
||||
context watch sender
|
||||
case m ⇒ context.system.deadLetters forward DeadLetter(m, sender, self)
|
||||
case RegisterTerminationHook if sender() != context.system.deadLetters ⇒
|
||||
terminationHooks += sender()
|
||||
context watch sender()
|
||||
case m ⇒ context.system.deadLetters forward DeadLetter(m, sender(), self)
|
||||
}
|
||||
|
||||
def terminating: Receive = {
|
||||
case Terminated(a) ⇒ stopWhenAllTerminationHooksDone(a)
|
||||
case TerminationHookDone ⇒ stopWhenAllTerminationHooksDone(sender)
|
||||
case m ⇒ context.system.deadLetters forward DeadLetter(m, sender, self)
|
||||
case TerminationHookDone ⇒ stopWhenAllTerminationHooksDone(sender())
|
||||
case m ⇒ context.system.deadLetters forward DeadLetter(m, sender(), self)
|
||||
}
|
||||
|
||||
def stopWhenAllTerminationHooksDone(remove: ActorRef): Unit = {
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ abstract class ActorSelection extends Serializable {
|
|||
*
|
||||
* Works, no matter whether originally sent with tell/'!' or ask/'?'.
|
||||
*/
|
||||
def forward(message: Any)(implicit context: ActorContext) = tell(message, context.sender)
|
||||
def forward(message: Any)(implicit context: ActorContext) = tell(message, context.sender())
|
||||
|
||||
/**
|
||||
* Resolve the [[ActorRef]] matching this selection.
|
||||
|
|
|
|||
|
|
@ -573,7 +573,7 @@ trait FSM[S, D] extends Actor with Listeners with ActorLogging {
|
|||
timeoutFuture = None
|
||||
}
|
||||
generation += 1
|
||||
processMsg(value, sender)
|
||||
processMsg(value, sender())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -597,7 +597,7 @@ trait FSM[S, D] extends Actor with Listeners with ActorLogging {
|
|||
nextState.stopReason match {
|
||||
case None ⇒ makeTransition(nextState)
|
||||
case _ ⇒
|
||||
nextState.replies.reverse foreach { r ⇒ sender ! r }
|
||||
nextState.replies.reverse foreach { r ⇒ sender() ! r }
|
||||
terminate(nextState)
|
||||
context.stop(self)
|
||||
}
|
||||
|
|
@ -607,7 +607,7 @@ trait FSM[S, D] extends Actor with Listeners with ActorLogging {
|
|||
if (!stateFunctions.contains(nextState.stateName)) {
|
||||
terminate(stay withStopReason Failure("Next state %s does not exist".format(nextState.stateName)))
|
||||
} else {
|
||||
nextState.replies.reverse foreach { r ⇒ sender ! r }
|
||||
nextState.replies.reverse foreach { r ⇒ sender() ! r }
|
||||
if (currentState.stateName != nextState.stateName) {
|
||||
this.nextState = nextState
|
||||
handleTransition(currentState.stateName, nextState.stateName)
|
||||
|
|
|
|||
|
|
@ -307,7 +307,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
|
|||
if (m.isOneWay) m(me)
|
||||
else {
|
||||
try {
|
||||
val s = sender
|
||||
val s = sender()
|
||||
m(me) match {
|
||||
case f: Future[_] if m.returnsFuture ⇒
|
||||
implicit val dispatcher = context.dispatcher
|
||||
|
|
@ -321,14 +321,14 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
|
|||
}
|
||||
} catch {
|
||||
case NonFatal(e) ⇒
|
||||
sender ! Status.Failure(e)
|
||||
sender() ! Status.Failure(e)
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case msg if me.isInstanceOf[Receiver] ⇒ withContext {
|
||||
me.asInstanceOf[Receiver].onReceive(msg, sender)
|
||||
me.asInstanceOf[Receiver].onReceive(msg, sender())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -120,7 +120,7 @@ abstract class UntypedActor extends Actor {
|
|||
* always a legal destination to send to, even if there is no logical recipient
|
||||
* for the reply, in which case it will be sent to the dead letter mailbox.
|
||||
*/
|
||||
def getSender(): ActorRef = sender
|
||||
def getSender(): ActorRef = sender()
|
||||
|
||||
/**
|
||||
* User overridable definition the strategy to use for supervising
|
||||
|
|
|
|||
|
|
@ -66,7 +66,7 @@ trait Inbox { this: ActorDSL.type ⇒
|
|||
var printedWarning = false
|
||||
|
||||
def enqueueQuery(q: Query) {
|
||||
val query = q withClient sender
|
||||
val query = q withClient sender()
|
||||
clients enqueue query
|
||||
clientsByTimeout += query
|
||||
}
|
||||
|
|
@ -96,13 +96,13 @@ trait Inbox { this: ActorDSL.type ⇒
|
|||
def receive = ({
|
||||
case g: Get ⇒
|
||||
if (messages.isEmpty) enqueueQuery(g)
|
||||
else sender ! messages.dequeue()
|
||||
else sender() ! messages.dequeue()
|
||||
case s @ Select(_, predicate, _) ⇒
|
||||
if (messages.isEmpty) enqueueQuery(s)
|
||||
else {
|
||||
currentSelect = s
|
||||
messages.dequeueFirst(messagePredicate) match {
|
||||
case Some(msg) ⇒ sender ! msg
|
||||
case Some(msg) ⇒ sender() ! msg
|
||||
case None ⇒ enqueueQuery(s)
|
||||
}
|
||||
currentSelect = null
|
||||
|
|
|
|||
|
|
@ -793,7 +793,7 @@ object Logging {
|
|||
*/
|
||||
class DefaultLogger extends Actor with StdOutLogger {
|
||||
override def receive: Receive = {
|
||||
case InitializeLogger(_) ⇒ sender ! LoggerInitialized
|
||||
case InitializeLogger(_) ⇒ sender() ! LoggerInitialized
|
||||
case event: LogEvent ⇒ print(event)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -84,7 +84,7 @@ private[io] object SelectionHandler {
|
|||
name = "selectors")
|
||||
|
||||
final def workerForCommandHandler(pf: PartialFunction[HasFailureMessage, ChannelRegistry ⇒ Props]): Receive = {
|
||||
case cmd: HasFailureMessage if pf.isDefinedAt(cmd) ⇒ selectorPool ! WorkerForCommand(cmd, sender, pf(cmd))
|
||||
case cmd: HasFailureMessage if pf.isDefinedAt(cmd) ⇒ selectorPool ! WorkerForCommand(cmd, sender(), pf(cmd))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
|
|||
|
||||
case cmd: CloseCommand ⇒
|
||||
val info = ConnectionInfo(registration, commander, keepOpenOnPeerClosed = false, useResumeWriting = false)
|
||||
handleClose(info, Some(sender), cmd.event)
|
||||
handleClose(info, Some(sender()), cmd.event)
|
||||
|
||||
case ReceiveTimeout ⇒
|
||||
// after sending `Register` user should watch this actor to make sure
|
||||
|
|
@ -76,13 +76,13 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
|
|||
case SuspendReading ⇒ suspendReading(info)
|
||||
case ResumeReading ⇒ resumeReading(info)
|
||||
case ChannelReadable ⇒ doRead(info, None)
|
||||
case cmd: CloseCommand ⇒ handleClose(info, Some(sender), cmd.event)
|
||||
case cmd: CloseCommand ⇒ handleClose(info, Some(sender()), cmd.event)
|
||||
}
|
||||
|
||||
/** the peer sent EOF first, but we may still want to send */
|
||||
def peerSentEOF(info: ConnectionInfo): Receive =
|
||||
handleWriteMessages(info) orElse {
|
||||
case cmd: CloseCommand ⇒ handleClose(info, Some(sender), cmd.event)
|
||||
case cmd: CloseCommand ⇒ handleClose(info, Some(sender()), cmd.event)
|
||||
}
|
||||
|
||||
/** connection is closing but a write has to be finished first */
|
||||
|
|
@ -104,7 +104,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
|
|||
|
||||
case WriteFileFailed(e) ⇒ handleError(info.handler, e) // rethrow exception from dispatcher task
|
||||
|
||||
case Abort ⇒ handleClose(info, Some(sender), Aborted)
|
||||
case Abort ⇒ handleClose(info, Some(sender()), Aborted)
|
||||
}
|
||||
|
||||
/** connection is closed on our side and we're waiting from confirmation from the other side */
|
||||
|
|
@ -112,7 +112,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
|
|||
case SuspendReading ⇒ suspendReading(info)
|
||||
case ResumeReading ⇒ resumeReading(info)
|
||||
case ChannelReadable ⇒ doRead(info, closeCommander)
|
||||
case Abort ⇒ handleClose(info, Some(sender), Aborted)
|
||||
case Abort ⇒ handleClose(info, Some(sender()), Aborted)
|
||||
}
|
||||
|
||||
def handleWriteMessages(info: ConnectionInfo): Receive = {
|
||||
|
|
@ -128,15 +128,15 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
|
|||
case write: WriteCommand ⇒
|
||||
if (writingSuspended) {
|
||||
if (TraceLogging) log.debug("Dropping write because writing is suspended")
|
||||
sender ! write.failureMessage
|
||||
sender() ! write.failureMessage
|
||||
|
||||
} else if (writePending) {
|
||||
if (TraceLogging) log.debug("Dropping write because queue is full")
|
||||
sender ! write.failureMessage
|
||||
sender() ! write.failureMessage
|
||||
if (info.useResumeWriting) writingSuspended = true
|
||||
|
||||
} else {
|
||||
pendingWrite = PendingWrite(sender, write)
|
||||
pendingWrite = PendingWrite(sender(), write)
|
||||
if (writePending) doWrite(info)
|
||||
}
|
||||
|
||||
|
|
@ -153,9 +153,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
|
|||
*/
|
||||
writingSuspended = false
|
||||
if (writePending) {
|
||||
if (interestedInResume.isEmpty) interestedInResume = Some(sender)
|
||||
else sender ! CommandFailed(ResumeWriting)
|
||||
} else sender ! WritingResumed
|
||||
if (interestedInResume.isEmpty) interestedInResume = Some(sender())
|
||||
else sender() ! CommandFailed(ResumeWriting)
|
||||
} else sender() ! WritingResumed
|
||||
|
||||
case UpdatePendingWrite(remaining) ⇒
|
||||
pendingWrite = remaining
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ private[io] class TcpListener(selectorRouter: ActorRef,
|
|||
case Unbind ⇒
|
||||
log.debug("Unbinding endpoint {}", localAddress)
|
||||
channel.close()
|
||||
sender ! Unbound
|
||||
sender() ! Unbound
|
||||
log.debug("Unbound endpoint {}, stopping listener", localAddress)
|
||||
context.stop(self)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ import akka.actor.{ ActorLogging, Props }
|
|||
*
|
||||
* To bind and listen to a local address, a [[akka.io.Tcp.Bind]] command must be sent to this actor. If the binding
|
||||
* was successful, the sender of the [[akka.io.Tcp.Bind]] will be notified with a [[akka.io.Tcp.Bound]]
|
||||
* message. The sender of the [[akka.io.Tcp.Bound]] message is the Listener actor (an internal actor responsible for
|
||||
* message. The sender() of the [[akka.io.Tcp.Bound]] message is the Listener actor (an internal actor responsible for
|
||||
* listening to server events). To unbind the port an [[akka.io.Tcp.Unbind]] message must be sent to the Listener actor.
|
||||
*
|
||||
* If the bind request is rejected because the Tcp system is not able to register more channels (see the nr-of-selectors
|
||||
|
|
@ -32,7 +32,7 @@ import akka.actor.{ ActorLogging, Props }
|
|||
* == Connect ==
|
||||
*
|
||||
* To initiate a connection to a remote server, a [[akka.io.Tcp.Connect]] message must be sent to this actor. If the
|
||||
* connection succeeds, the sender will be notified with a [[akka.io.Tcp.Connected]] message. The sender of the
|
||||
* connection succeeds, the sender() will be notified with a [[akka.io.Tcp.Connected]] message. The sender of the
|
||||
* [[akka.io.Tcp.Connected]] message is the Connection actor (an internal actor representing the TCP connection). Before
|
||||
* starting to use the connection, a handler must be registered to the Connection actor by sending a [[akka.io.Tcp.Register]]
|
||||
* command message. After a handler has been registered, all incoming data will be sent to the handler in the form of
|
||||
|
|
@ -49,11 +49,11 @@ private[io] class TcpManager(tcp: TcpExt)
|
|||
|
||||
def receive = workerForCommandHandler {
|
||||
case c: Connect ⇒
|
||||
val commander = sender // cache because we create a function that will run asyncly
|
||||
val commander = sender() // cache because we create a function that will run asyncly
|
||||
(registry ⇒ Props(classOf[TcpOutgoingConnection], tcp, registry, commander, c))
|
||||
|
||||
case b: Bind ⇒
|
||||
val commander = sender // cache because we create a function that will run asyncly
|
||||
val commander = sender() // cache because we create a function that will run asyncly
|
||||
(registry ⇒ Props(classOf[TcpListener], selectorPool, tcp, registry, commander, b))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ private[io] class UdpConnectedManager(udpConn: UdpConnectedExt)
|
|||
|
||||
def receive = workerForCommandHandler {
|
||||
case c: Connect ⇒
|
||||
val commander = sender // cache because we create a function that will run asyncly
|
||||
val commander = sender() // cache because we create a function that will run asyncly
|
||||
registry ⇒ Props(classOf[UdpConnection], udpConn, registry, commander, c)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -65,20 +65,20 @@ private[io] class UdpConnection(udpConn: UdpConnectedExt,
|
|||
case Disconnect ⇒
|
||||
log.debug("Closing UDP connection to [{}]", remoteAddress)
|
||||
channel.close()
|
||||
sender ! Disconnected
|
||||
sender() ! Disconnected
|
||||
log.debug("Connection closed to [{}], stopping listener", remoteAddress)
|
||||
context.stop(self)
|
||||
|
||||
case send: Send if writePending ⇒
|
||||
if (TraceLogging) log.debug("Dropping write because queue is full")
|
||||
sender ! CommandFailed(send)
|
||||
sender() ! CommandFailed(send)
|
||||
|
||||
case send: Send if send.payload.isEmpty ⇒
|
||||
if (send.wantsAck)
|
||||
sender ! send.ack
|
||||
sender() ! send.ack
|
||||
|
||||
case send: Send ⇒
|
||||
pendingSend = (send, sender)
|
||||
pendingSend = (send, sender())
|
||||
registration.enableInterest(OP_WRITE)
|
||||
|
||||
case ChannelWritable ⇒ doWrite()
|
||||
|
|
|
|||
|
|
@ -68,7 +68,7 @@ private[io] class UdpListener(val udp: UdpExt,
|
|||
log.debug("Unbinding endpoint [{}]", bind.localAddress)
|
||||
try {
|
||||
channel.close()
|
||||
sender ! Unbound
|
||||
sender() ! Unbound
|
||||
log.debug("Unbound endpoint [{}], stopping listener", bind.localAddress)
|
||||
} finally context.stop(self)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,11 +47,11 @@ private[io] class UdpManager(udp: UdpExt) extends SelectionHandler.SelectorBased
|
|||
|
||||
def receive = workerForCommandHandler {
|
||||
case b: Bind ⇒
|
||||
val commander = sender // cache because we create a function that will run asyncly
|
||||
val commander = sender() // cache because we create a function that will run asyncly
|
||||
(registry ⇒ Props(classOf[UdpListener], udp, registry, commander, b))
|
||||
|
||||
case SimpleSender(options) ⇒
|
||||
val commander = sender // cache because we create a function that will run asyncly
|
||||
val commander = sender() // cache because we create a function that will run asyncly
|
||||
(registry ⇒ Props(classOf[UdpSender], udp, registry, commander, options))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -30,15 +30,15 @@ private[io] trait WithUdpSend {
|
|||
def sendHandlers(registration: ChannelRegistration): Receive = {
|
||||
case send: Send if hasWritePending ⇒
|
||||
if (TraceLogging) log.debug("Dropping write because queue is full")
|
||||
sender ! CommandFailed(send)
|
||||
sender() ! CommandFailed(send)
|
||||
|
||||
case send: Send if send.payload.isEmpty ⇒
|
||||
if (send.wantsAck)
|
||||
sender ! send.ack
|
||||
sender() ! send.ack
|
||||
|
||||
case send: Send ⇒
|
||||
pendingSend = send
|
||||
pendingCommander = sender
|
||||
pendingCommander = sender()
|
||||
doSend(registration)
|
||||
|
||||
case ChannelWritable ⇒ if (hasWritePending) doSend(registration)
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ trait Listeners { self: Actor ⇒
|
|||
}
|
||||
|
||||
/**
|
||||
* Sends the supplied message to all current listeners using the provided sender as sender.
|
||||
* Sends the supplied message to all current listeners using the provided sender() as sender.
|
||||
*
|
||||
* @param msg
|
||||
* @param sender
|
||||
|
|
|
|||
|
|
@ -166,9 +166,9 @@ private[akka] class RouterActor extends Actor {
|
|||
|
||||
def receive = {
|
||||
case GetRoutees ⇒
|
||||
sender ! Routees(cell.router.routees)
|
||||
sender() ! Routees(cell.router.routees)
|
||||
case CurrentRoutees ⇒
|
||||
context.actorOf(Props(classOf[CollectRouteeRefs], cell.router.routees, sender))
|
||||
context.actorOf(Props(classOf[CollectRouteeRefs], cell.router.routees, sender()))
|
||||
case AddRoutee(routee) ⇒
|
||||
cell.addRoutee(routee)
|
||||
case RemoveRoutee(routee) ⇒
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ trait ProducerSupport extends Actor with CamelSupport {
|
|||
case msg ⇒
|
||||
producerChild match {
|
||||
case Some(child) ⇒ child forward transformOutgoingMessage(msg)
|
||||
case None ⇒ messages :+= ((sender, msg))
|
||||
case None ⇒ messages :+= ((sender(), msg))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -103,7 +103,7 @@ trait ProducerSupport extends Actor with CamelSupport {
|
|||
* actor).
|
||||
*/
|
||||
|
||||
protected def routeResponse(msg: Any): Unit = if (!oneway) sender ! transformResponse(msg)
|
||||
protected def routeResponse(msg: Any): Unit = if (!oneway) sender() ! transformResponse(msg)
|
||||
|
||||
private class ProducerChild(endpoint: Endpoint, processor: SendProcessor) extends Actor {
|
||||
def receive = {
|
||||
|
|
@ -129,7 +129,7 @@ trait ProducerSupport extends Actor with CamelSupport {
|
|||
// Need copies of sender reference here since the callback could be done
|
||||
// later by another thread.
|
||||
val producer = self
|
||||
val originalSender = sender
|
||||
val originalSender = sender()
|
||||
val xchg = new CamelExchangeAdapter(endpoint.createExchange(pattern))
|
||||
val cmsg = CamelMessage.canonicalize(msg)
|
||||
xchg.setRequest(cmsg)
|
||||
|
|
|
|||
|
|
@ -31,8 +31,8 @@ private[camel] class ActivationTracker extends Actor with ActorLogging {
|
|||
var awaitingDeActivation = List[ActorRef]()
|
||||
|
||||
{
|
||||
case AwaitActivation(ref) ⇒ awaitingActivation ::= sender
|
||||
case AwaitDeActivation(ref) ⇒ awaitingDeActivation ::= sender
|
||||
case AwaitActivation(ref) ⇒ awaitingActivation ::= sender()
|
||||
case AwaitDeActivation(ref) ⇒ awaitingDeActivation ::= sender()
|
||||
case msg @ EndpointActivated(ref) ⇒
|
||||
awaitingActivation.foreach(_ ! msg)
|
||||
receive = activated(awaitingDeActivation)
|
||||
|
|
@ -51,8 +51,8 @@ private[camel] class ActivationTracker extends Actor with ActorLogging {
|
|||
var awaitingDeActivation = currentAwaitingDeActivation
|
||||
|
||||
{
|
||||
case AwaitActivation(ref) ⇒ sender ! EndpointActivated(ref)
|
||||
case AwaitDeActivation(ref) ⇒ awaitingDeActivation ::= sender
|
||||
case AwaitActivation(ref) ⇒ sender() ! EndpointActivated(ref)
|
||||
case AwaitDeActivation(ref) ⇒ awaitingDeActivation ::= sender()
|
||||
case msg @ EndpointDeActivated(ref) ⇒
|
||||
awaitingDeActivation foreach (_ ! msg)
|
||||
receive = deactivated
|
||||
|
|
@ -67,9 +67,9 @@ private[camel] class ActivationTracker extends Actor with ActorLogging {
|
|||
* @return a partial function that handles messages in the 'de-activated' state
|
||||
*/
|
||||
def deactivated: State = {
|
||||
// deactivated means it was activated at some point, so tell sender it was activated
|
||||
case AwaitActivation(ref) ⇒ sender ! EndpointActivated(ref)
|
||||
case AwaitDeActivation(ref) ⇒ sender ! EndpointDeActivated(ref)
|
||||
// deactivated means it was activated at some point, so tell sender() it was activated
|
||||
case AwaitActivation(ref) ⇒ sender() ! EndpointActivated(ref)
|
||||
case AwaitDeActivation(ref) ⇒ sender() ! EndpointDeActivated(ref)
|
||||
//resurrected at restart.
|
||||
case msg @ EndpointActivated(ref) ⇒
|
||||
receive = activated(Nil)
|
||||
|
|
@ -81,8 +81,8 @@ private[camel] class ActivationTracker extends Actor with ActorLogging {
|
|||
* @return a partial function that handles messages in 'failed to activate' state
|
||||
*/
|
||||
def failedToActivate(cause: Throwable): State = {
|
||||
case AwaitActivation(ref) ⇒ sender ! EndpointFailedToActivate(ref, cause)
|
||||
case AwaitDeActivation(ref) ⇒ sender ! EndpointFailedToActivate(ref, cause)
|
||||
case AwaitActivation(ref) ⇒ sender() ! EndpointFailedToActivate(ref, cause)
|
||||
case AwaitDeActivation(ref) ⇒ sender() ! EndpointFailedToActivate(ref, cause)
|
||||
case EndpointDeActivated(_) ⇒ // the de-register at termination always sends a de-activated when the cleanup is done. ignoring.
|
||||
}
|
||||
|
||||
|
|
@ -92,8 +92,8 @@ private[camel] class ActivationTracker extends Actor with ActorLogging {
|
|||
* @return a partial function that handles messages in 'failed to de-activate' state
|
||||
*/
|
||||
def failedToDeActivate(cause: Throwable): State = {
|
||||
case AwaitActivation(ref) ⇒ sender ! EndpointActivated(ref)
|
||||
case AwaitDeActivation(ref) ⇒ sender ! EndpointFailedToDeActivate(ref, cause)
|
||||
case AwaitActivation(ref) ⇒ sender() ! EndpointActivated(ref)
|
||||
case AwaitDeActivation(ref) ⇒ sender() ! EndpointFailedToDeActivate(ref, cause)
|
||||
case EndpointDeActivated(_) ⇒ // the de-register at termination always sends a de-activated when the cleanup is done. ignoring.
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -75,7 +75,7 @@ class ActivationIntegrationTest extends WordSpec with Matchers with SharedCamelS
|
|||
}
|
||||
|
||||
override def receive = {
|
||||
case msg: CamelMessage ⇒ sender ! "received " + msg.body
|
||||
case msg: CamelMessage ⇒ sender() ! "received " + msg.body
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -159,7 +159,7 @@ class EchoConsumer(endpoint: String) extends Actor with Consumer {
|
|||
def endpointUri = endpoint
|
||||
|
||||
def receive = {
|
||||
case msg: CamelMessage ⇒ sender ! msg
|
||||
case msg: CamelMessage ⇒ sender() ! msg
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ class ConsumerIntegrationTest extends WordSpec with Matchers with NonSharedCamel
|
|||
start(new Consumer {
|
||||
def endpointUri = "direct:a1"
|
||||
def receive = {
|
||||
case m: CamelMessage ⇒ sender ! "received " + m.bodyAs[String]
|
||||
case m: CamelMessage ⇒ sender() ! "received " + m.bodyAs[String]
|
||||
}
|
||||
}, name = "direct-a1")
|
||||
camel.sendTo("direct:a1", msg = "some message") should be("received some message")
|
||||
|
|
@ -54,7 +54,7 @@ class ConsumerIntegrationTest extends WordSpec with Matchers with NonSharedCamel
|
|||
val ref = start(new Consumer {
|
||||
override def replyTimeout = SHORT_TIMEOUT
|
||||
def endpointUri = "direct:a3"
|
||||
def receive = { case _ ⇒ { Thread.sleep(LONG_WAIT.toMillis); sender ! "done" } }
|
||||
def receive = { case _ ⇒ { Thread.sleep(LONG_WAIT.toMillis); sender() ! "done" } }
|
||||
}, name = "ignore-this-deadletter-timeout-consumer-reply")
|
||||
|
||||
intercept[CamelExecutionException] {
|
||||
|
|
@ -71,7 +71,7 @@ class ConsumerIntegrationTest extends WordSpec with Matchers with NonSharedCamel
|
|||
|
||||
def receive = {
|
||||
case "throw" ⇒ throw new TestException("")
|
||||
case m: CamelMessage ⇒ sender ! "received " + m.bodyAs[String]
|
||||
case m: CamelMessage ⇒ sender() ! "received " + m.bodyAs[String]
|
||||
}
|
||||
|
||||
override def postRestart(reason: Throwable) {
|
||||
|
|
@ -138,7 +138,7 @@ class ConsumerIntegrationTest extends WordSpec with Matchers with NonSharedCamel
|
|||
"Consumer supports manual Ack" in {
|
||||
val ref = start(new ManualAckConsumer() {
|
||||
def endpointUri = "direct:manual-ack"
|
||||
def receive = { case _ ⇒ sender ! Ack }
|
||||
def receive = { case _ ⇒ sender() ! Ack }
|
||||
}, name = "direct-manual-ack-1")
|
||||
camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeoutDuration.toSeconds, TimeUnit.SECONDS) should be(null) //should not timeout
|
||||
stop(ref)
|
||||
|
|
@ -148,7 +148,7 @@ class ConsumerIntegrationTest extends WordSpec with Matchers with NonSharedCamel
|
|||
val someException = new Exception("e1")
|
||||
val ref = start(new ManualAckConsumer() {
|
||||
def endpointUri = "direct:manual-ack"
|
||||
def receive = { case _ ⇒ sender ! Failure(someException) }
|
||||
def receive = { case _ ⇒ sender() ! Failure(someException) }
|
||||
}, name = "direct-manual-ack-2")
|
||||
|
||||
intercept[ExecutionException] {
|
||||
|
|
@ -186,7 +186,7 @@ class ErrorThrowingConsumer(override val endpointUri: String) extends Consumer {
|
|||
}
|
||||
override def preRestart(reason: Throwable, message: Option[Any]) {
|
||||
super.preRestart(reason, message)
|
||||
sender ! Failure(reason)
|
||||
sender() ! Failure(reason)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -201,7 +201,7 @@ class ErrorRespondingConsumer(override val endpointUri: String) extends Consumer
|
|||
|
||||
final override def preRestart(reason: Throwable, message: Option[Any]) {
|
||||
super.preRestart(reason, message)
|
||||
sender ! Failure(reason)
|
||||
sender() ! Failure(reason)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -210,14 +210,14 @@ class FailingOnceConsumer(override val endpointUri: String) extends Consumer {
|
|||
def receive = {
|
||||
case msg: CamelMessage ⇒
|
||||
if (msg.headerAs[Boolean]("CamelRedelivered").getOrElse(false))
|
||||
sender ! ("accepted: %s" format msg.body)
|
||||
sender() ! ("accepted: %s" format msg.body)
|
||||
else
|
||||
throw new TestException("rejected: %s" format msg.body)
|
||||
}
|
||||
|
||||
final override def preRestart(reason: Throwable, message: Option[Any]) {
|
||||
super.preRestart(reason, message)
|
||||
sender ! Failure(reason)
|
||||
sender() ! Failure(reason)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -60,7 +60,7 @@ class ProducerFeatureTest extends TestKit(ActorSystem("test", AkkaSpec.testConf)
|
|||
case p: Props ⇒ {
|
||||
val producer = context.actorOf(p)
|
||||
context.watch(producer)
|
||||
sender ! producer
|
||||
sender() ! producer
|
||||
}
|
||||
case Terminated(actorRef) ⇒ {
|
||||
deadActor = Some(actorRef)
|
||||
|
|
@ -275,7 +275,7 @@ object ProducerFeatureTest {
|
|||
body: String ⇒
|
||||
if (body == "err") throw new Exception("Crash!")
|
||||
val upperMsg = body.toUpperCase
|
||||
lastSender = Some(sender)
|
||||
lastSender = Some(sender())
|
||||
lastMessage = Some(upperMsg)
|
||||
}
|
||||
else msg
|
||||
|
|
@ -314,9 +314,9 @@ object ProducerFeatureTest {
|
|||
class TestResponder extends Actor {
|
||||
def receive = {
|
||||
case msg: CamelMessage ⇒ msg.body match {
|
||||
case "fail" ⇒ context.sender ! akka.actor.Status.Failure(new AkkaCamelException(new Exception("failure"), msg.headers))
|
||||
case "fail" ⇒ context.sender() ! akka.actor.Status.Failure(new AkkaCamelException(new Exception("failure"), msg.headers))
|
||||
case _ ⇒
|
||||
context.sender ! (msg.mapBody {
|
||||
context.sender() ! (msg.mapBody {
|
||||
body: String ⇒ "received %s" format body
|
||||
})
|
||||
}
|
||||
|
|
@ -326,10 +326,10 @@ object ProducerFeatureTest {
|
|||
class ReplyingForwardTarget extends Actor {
|
||||
def receive = {
|
||||
case msg: CamelMessage ⇒
|
||||
context.sender ! (msg.copy(headers = msg.headers + ("test" -> "result")))
|
||||
context.sender() ! (msg.copy(headers = msg.headers + ("test" -> "result")))
|
||||
case msg: akka.actor.Status.Failure ⇒
|
||||
msg.cause match {
|
||||
case e: AkkaCamelException ⇒ context.sender ! Status.Failure(new AkkaCamelException(e, e.headers + ("test" -> "failure")))
|
||||
case e: AkkaCamelException ⇒ context.sender() ! Status.Failure(new AkkaCamelException(e, e.headers + ("test" -> "failure")))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -368,8 +368,8 @@ object ProducerFeatureTest {
|
|||
|
||||
class IntermittentErrorConsumer(override val endpointUri: String) extends Consumer {
|
||||
def receive = {
|
||||
case msg: CamelMessage if msg.bodyAs[String] == "fail" ⇒ sender ! Failure(new Exception("fail"))
|
||||
case msg: CamelMessage ⇒ sender ! msg
|
||||
case msg: CamelMessage if msg.bodyAs[String] == "fail" ⇒ sender() ! Failure(new Exception("fail"))
|
||||
case msg: CamelMessage ⇒ sender() ! msg
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -69,7 +69,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpecLike w
|
|||
within(1 second) {
|
||||
probe.expectMsgType[CamelMessage]
|
||||
info("message sent to consumer")
|
||||
probe.sender ! Ack
|
||||
probe.sender() ! Ack
|
||||
}
|
||||
verify(exchange, never()).setResponse(any[CamelMessage])
|
||||
info("no response forwarded to exchange")
|
||||
|
|
@ -173,7 +173,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpecLike w
|
|||
|
||||
within(1 second) {
|
||||
probe.expectMsgType[CamelMessage]
|
||||
probe.sender ! "some message"
|
||||
probe.sender() ! "some message"
|
||||
}
|
||||
doneSync should be(false)
|
||||
info("done async")
|
||||
|
|
@ -196,7 +196,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpecLike w
|
|||
|
||||
within(1 second) {
|
||||
probe.expectMsgType[CamelMessage]
|
||||
probe.sender ! failure
|
||||
probe.sender() ! failure
|
||||
asyncCallback.awaitCalled(remaining)
|
||||
}
|
||||
|
||||
|
|
@ -259,7 +259,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpecLike w
|
|||
within(1 second) {
|
||||
probe.expectMsgType[CamelMessage]
|
||||
info("message sent to consumer")
|
||||
probe.sender ! Ack
|
||||
probe.sender() ! Ack
|
||||
asyncCallback.expectDoneAsyncWithin(remaining)
|
||||
info("async callback called")
|
||||
}
|
||||
|
|
@ -277,7 +277,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpecLike w
|
|||
within(1 second) {
|
||||
probe.expectMsgType[CamelMessage]
|
||||
info("message sent to consumer")
|
||||
probe.sender ! "some neither Ack nor Failure response"
|
||||
probe.sender() ! "some neither Ack nor Failure response"
|
||||
asyncCallback.expectDoneAsyncWithin(remaining)
|
||||
info("async callback called")
|
||||
}
|
||||
|
|
@ -310,7 +310,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpecLike w
|
|||
within(1 second) {
|
||||
probe.expectMsgType[CamelMessage]
|
||||
info("message sent to consumer")
|
||||
probe.sender ! Failure(new Exception)
|
||||
probe.sender() ! Failure(new Exception)
|
||||
asyncCallback.awaitCalled(remaining)
|
||||
}
|
||||
verify(exchange, never()).setResponse(any[CamelMessage])
|
||||
|
|
@ -427,7 +427,7 @@ private[camel] trait ActorProducerFixture extends MockitoSugar with BeforeAndAft
|
|||
}
|
||||
|
||||
def echoActor = system.actorOf(Props(new Actor {
|
||||
def receive = { case msg ⇒ sender ! "received " + msg }
|
||||
def receive = { case msg ⇒ sender() ! "received " + msg }
|
||||
}), name = "echoActor")
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -206,7 +206,7 @@ private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLoggi
|
|||
override def postStop(): Unit = Cluster(context.system).shutdown()
|
||||
|
||||
def receive = {
|
||||
case InternalClusterAction.GetClusterCoreRef ⇒ sender ! coreDaemon
|
||||
case InternalClusterAction.GetClusterCoreRef ⇒ sender() ! coreDaemon
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -289,7 +289,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
}
|
||||
|
||||
def uninitialized: Actor.Receive = {
|
||||
case InitJoin ⇒ sender ! InitJoinNack(selfAddress)
|
||||
case InitJoin ⇒ sender() ! InitJoinNack(selfAddress)
|
||||
case ClusterUserAction.JoinTo(address) ⇒ join(address)
|
||||
case JoinSeedNodes(seedNodes) ⇒ joinSeedNodes(seedNodes)
|
||||
case msg: SubscriptionMessage ⇒ publisher forward msg
|
||||
|
|
@ -297,7 +297,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
|
||||
def tryingToJoin(joinWith: Address, deadline: Option[Deadline]): Actor.Receive = {
|
||||
case Welcome(from, gossip) ⇒ welcome(joinWith, from, gossip)
|
||||
case InitJoin ⇒ sender ! InitJoinNack(selfAddress)
|
||||
case InitJoin ⇒ sender() ! InitJoinNack(selfAddress)
|
||||
case ClusterUserAction.JoinTo(address) ⇒
|
||||
becomeUninitialized()
|
||||
join(address)
|
||||
|
|
@ -364,7 +364,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
case other ⇒ super.unhandled(other)
|
||||
}
|
||||
|
||||
def initJoin(): Unit = sender ! InitJoinAck(selfAddress)
|
||||
def initJoin(): Unit = sender() ! InitJoinAck(selfAddress)
|
||||
|
||||
def joinSeedNodes(seedNodes: immutable.IndexedSeq[Address]): Unit = {
|
||||
if (seedNodes.nonEmpty) {
|
||||
|
|
@ -464,7 +464,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
|
||||
logInfo("Node [{}] is JOINING, roles [{}]", node.address, roles.mkString(", "))
|
||||
if (node != selfUniqueAddress) {
|
||||
sender ! Welcome(selfUniqueAddress, latestGossip)
|
||||
sender() ! Welcome(selfUniqueAddress, latestGossip)
|
||||
}
|
||||
|
||||
publish(latestGossip)
|
||||
|
|
@ -484,7 +484,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
latestGossip = gossip seen selfUniqueAddress
|
||||
publish(latestGossip)
|
||||
if (from != selfUniqueAddress)
|
||||
gossipTo(from, sender)
|
||||
gossipTo(from, sender())
|
||||
becomeInitialized()
|
||||
}
|
||||
}
|
||||
|
|
@ -574,8 +574,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
else {
|
||||
(status.version compareTo latestGossip.version) match {
|
||||
case VectorClock.Same ⇒ // same version
|
||||
case VectorClock.After ⇒ gossipStatusTo(from, sender) // remote is newer
|
||||
case _ ⇒ gossipTo(from, sender) // conflicting or local is newer
|
||||
case VectorClock.After ⇒ gossipStatusTo(from, sender()) // remote is newer
|
||||
case _ ⇒ gossipTo(from, sender()) // conflicting or local is newer
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -664,9 +664,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
if (selfStatus == Exiting || selfStatus == Down)
|
||||
shutdown()
|
||||
else if (talkback) {
|
||||
// send back gossip to sender when sender had different view, i.e. merge, or sender had
|
||||
// older or sender had newer
|
||||
gossipTo(from, sender)
|
||||
// send back gossip to sender() when sender() had different view, i.e. merge, or sender() had
|
||||
// older or sender() had newer
|
||||
gossipTo(from, sender())
|
||||
}
|
||||
gossipType
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLo
|
|||
val selfHeartbeatRsp = HeartbeatRsp(Cluster(context.system).selfUniqueAddress)
|
||||
|
||||
def receive = {
|
||||
case Heartbeat(from) ⇒ sender ! selfHeartbeatRsp
|
||||
case Heartbeat(from) ⇒ sender() ! selfHeartbeatRsp
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ final case class AdaptiveLoadBalancingRoutingLogic(system: ActorSystem, metricsS
|
|||
|
||||
// The current weighted routees, if any. Weights are produced by the metricsSelector
|
||||
// via the metricsListener Actor. It's only updated by the actor, but accessed from
|
||||
// the threads of the senders.
|
||||
// the threads of the sender()s.
|
||||
private val weightedRouteesRef =
|
||||
new AtomicReference[(immutable.IndexedSeq[Routee], Set[NodeMetrics], Option[WeightedRoutees])](
|
||||
(Vector.empty, Set.empty, None))
|
||||
|
|
|
|||
|
|
@ -68,7 +68,7 @@ object MultiNodeClusterSpec {
|
|||
}
|
||||
case End ⇒
|
||||
testActor forward End
|
||||
sender ! EndAck
|
||||
sender() ! EndAck
|
||||
case EndAck ⇒
|
||||
testActor forward EndAck
|
||||
}
|
||||
|
|
|
|||
|
|
@ -70,7 +70,7 @@ abstract class RestartFirstSeedNodeSpec
|
|||
def receive = {
|
||||
case a: Address ⇒
|
||||
seedNode1Address = a
|
||||
sender ! "ok"
|
||||
sender() ! "ok"
|
||||
}
|
||||
}).withDeploy(Deploy.local), name = "address-receiver")
|
||||
enterBarrier("seed1-address-receiver-ready")
|
||||
|
|
|
|||
|
|
@ -520,8 +520,8 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
case SendBatch ⇒ sendJobs()
|
||||
case RetryTick ⇒ resend()
|
||||
case End ⇒
|
||||
done(sender)
|
||||
context.become(ending(sender))
|
||||
done(sender())
|
||||
context.become(ending(sender()))
|
||||
}
|
||||
|
||||
def ending(replyTo: ActorRef): Receive = {
|
||||
|
|
@ -571,7 +571,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
*/
|
||||
class Worker extends Actor with ActorLogging {
|
||||
def receive = {
|
||||
case SimpleJob(id, payload) ⇒ sender ! Ack(id)
|
||||
case SimpleJob(id, payload) ⇒ sender() ! Ack(id)
|
||||
case TreeJob(id, payload, idx, levels, width) ⇒
|
||||
// create the actors when first TreeJob message is received
|
||||
val totalActors = ((width * math.pow(width, levels) - 1) / (width - 1)).toInt
|
||||
|
|
@ -583,7 +583,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
}
|
||||
|
||||
def treeWorker(tree: ActorRef): Receive = {
|
||||
case SimpleJob(id, payload) ⇒ sender ! Ack(id)
|
||||
case SimpleJob(id, payload) ⇒ sender() ! Ack(id)
|
||||
case TreeJob(id, payload, idx, _, _) ⇒
|
||||
tree forward ((idx, SimpleJob(id, payload)))
|
||||
}
|
||||
|
|
@ -602,7 +602,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
|
||||
class Leaf extends Actor {
|
||||
def receive = {
|
||||
case (_: Int, job: SimpleJob) ⇒ sender ! Ack(job.id)
|
||||
case (_: Int, job: SimpleJob) ⇒ sender() ! Ack(job.id)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -630,7 +630,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
def receive = {
|
||||
case props: Props ⇒ context.actorOf(props)
|
||||
case e: Exception ⇒ context.children foreach { _ ! e }
|
||||
case GetChildrenCount ⇒ sender ! ChildrenCount(context.children.size, restartCount)
|
||||
case GetChildrenCount ⇒ sender() ! ChildrenCount(context.children.size, restartCount)
|
||||
case Reset ⇒
|
||||
require(context.children.isEmpty,
|
||||
s"ResetChildrenCount not allowed when children exists, [${context.children.size}]")
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ object SurviveNetworkInstabilityMultiJvmSpec extends MultiNodeConfig {
|
|||
|
||||
class Parent extends Actor {
|
||||
def receive = {
|
||||
case p: Props ⇒ sender ! context.actorOf(p)
|
||||
case p: Props ⇒ sender() ! context.actorOf(p)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -53,7 +53,7 @@ object SurviveNetworkInstabilityMultiJvmSpec extends MultiNodeConfig {
|
|||
def receive = {
|
||||
case "hello" ⇒
|
||||
context.system.scheduler.scheduleOnce(2.seconds, self, "boom")
|
||||
sender ! "hello"
|
||||
sender() ! "hello"
|
||||
case "boom" ⇒ throw new SimulatedException
|
||||
}
|
||||
}
|
||||
|
|
@ -78,8 +78,8 @@ class SurviveNetworkInstabilityMultiJvmNode8 extends SurviveNetworkInstabilitySp
|
|||
|
||||
abstract class SurviveNetworkInstabilitySpec
|
||||
extends MultiNodeSpec(SurviveNetworkInstabilityMultiJvmSpec)
|
||||
with MultiNodeClusterSpec
|
||||
with ImplicitSender {
|
||||
with MultiNodeClusterSpec
|
||||
with ImplicitSender {
|
||||
|
||||
import SurviveNetworkInstabilityMultiJvmSpec._
|
||||
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ object AdaptiveLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig {
|
|||
|
||||
class Routee extends Actor {
|
||||
def receive = {
|
||||
case _ ⇒ sender ! Reply(Cluster(context.system).selfAddress)
|
||||
case _ ⇒ sender() ! Reply(Cluster(context.system).selfAddress)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -45,7 +45,7 @@ object AdaptiveLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig {
|
|||
val numberOfArrays = allocateBytes / 1024
|
||||
usedMemory = Array.ofDim(numberOfArrays, 248) // each 248 element Int array will use ~ 1 kB
|
||||
log.debug("used heap after: [{}] bytes", ManagementFactory.getMemoryMXBean.getHeapMemoryUsage.getUsed)
|
||||
sender ! "done"
|
||||
sender() ! "done"
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ object ClusterConsistentHashingRouterMultiJvmSpec extends MultiNodeConfig {
|
|||
|
||||
class Echo extends Actor {
|
||||
def receive = {
|
||||
case _ ⇒ sender ! self
|
||||
case _ ⇒ sender() ! self
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ object ClusterRoundRobinRoutedActorMultiJvmSpec extends MultiNodeConfig {
|
|||
def this() = this(DeployRoutee)
|
||||
|
||||
def receive = {
|
||||
case "hit" ⇒ sender ! Reply(routeeType, self)
|
||||
case "hit" ⇒ sender() ! Reply(routeeType, self)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Some files were not shown because too many files have changed in this diff Show more
Loading…
Add table
Add a link
Reference in a new issue