Resolve merge conflict with master

This commit is contained in:
Viktor Klang 2011-09-19 19:00:58 +02:00
commit f9e23c3102
98 changed files with 480 additions and 546 deletions

View file

@ -2,6 +2,6 @@ package akka.actor;
public class JavaAPITestActor extends UntypedActor { public class JavaAPITestActor extends UntypedActor {
public void onReceive(Object msg) { public void onReceive(Object msg) {
getContext().tryReply("got it!"); tryReply("got it!");
} }
} }

View file

@ -21,9 +21,9 @@ object ActorFireForgetRequestReplySpec {
class ReplyActor extends Actor { class ReplyActor extends Actor {
def receive = { def receive = {
case "Send" case "Send"
self.reply("Reply") reply("Reply")
case "SendImplicit" case "SendImplicit"
self.channel ! "ReplyImplicit" channel ! "ReplyImplicit"
} }
} }

View file

@ -26,15 +26,15 @@ object ActorRefSpec {
def receive = { def receive = {
case "complexRequest" { case "complexRequest" {
replyTo = self.channel replyTo = channel
val worker = actorOf(Props[WorkerActor]) val worker = actorOf(Props[WorkerActor])
worker ! "work" worker ! "work"
} }
case "complexRequest2" case "complexRequest2"
val worker = actorOf(Props[WorkerActor]) val worker = actorOf(Props[WorkerActor])
worker ! self.channel worker ! channel
case "workDone" replyTo ! "complexReply" case "workDone" replyTo ! "complexReply"
case "simpleRequest" self.reply("simpleReply") case "simpleRequest" reply("simpleReply")
} }
} }
@ -42,7 +42,7 @@ object ActorRefSpec {
def receive = { def receive = {
case "work" { case "work" {
work work
self.reply("workDone") reply("workDone")
self.stop() self.stop()
} }
case replyTo: Channel[Any] { case replyTo: Channel[Any] {
@ -73,7 +73,7 @@ object ActorRefSpec {
class OuterActor(val inner: ActorRef) extends Actor { class OuterActor(val inner: ActorRef) extends Actor {
def receive = { def receive = {
case "self" self reply self case "self" reply(self)
case x inner forward x case x inner forward x
} }
} }
@ -82,7 +82,7 @@ object ActorRefSpec {
val fail = new InnerActor val fail = new InnerActor
def receive = { def receive = {
case "self" self reply self case "self" reply(self)
case x inner forward x case x inner forward x
} }
} }
@ -93,8 +93,8 @@ object ActorRefSpec {
class InnerActor extends Actor { class InnerActor extends Actor {
def receive = { def receive = {
case "innerself" self reply self case "innerself" reply(self)
case other self reply other case other reply(other)
} }
} }
@ -102,8 +102,8 @@ object ActorRefSpec {
val fail = new InnerActor val fail = new InnerActor
def receive = { def receive = {
case "innerself" self reply self case "innerself" reply(self)
case other self reply other case other reply(other)
} }
} }
@ -258,7 +258,7 @@ class ActorRefSpec extends WordSpec with MustMatchers {
"support nested actorOfs" in { "support nested actorOfs" in {
val a = actorOf(new Actor { val a = actorOf(new Actor {
val nested = actorOf(new Actor { def receive = { case _ } }) val nested = actorOf(new Actor { def receive = { case _ } })
def receive = { case _ self reply nested } def receive = { case _ reply(nested) }
}) })
val nested = (a ? "any").as[ActorRef].get val nested = (a ? "any").as[ActorRef].get
@ -304,9 +304,11 @@ class ActorRefSpec extends WordSpec with MustMatchers {
"stop when sent a poison pill" in { "stop when sent a poison pill" in {
val timeout = Timeout(20000) val timeout = Timeout(20000)
val ref = actorOf(Props(self { val ref = actorOf(Props(new Actor {
case 5 self tryReply "five" def receive = {
case null self tryReply "null" case 5 tryReply("five")
case null tryReply("null")
}
})) }))
val ffive = (ref ? (5, timeout)).mapTo[String] val ffive = (ref ? (5, timeout)).mapTo[String]

View file

@ -12,16 +12,16 @@ object ActorRegistrySpec {
class TestActor extends Actor { class TestActor extends Actor {
def receive = { def receive = {
case "ping" case "ping"
self.reply("got ping") reply("got ping")
} }
} }
class TestActor2 extends Actor { class TestActor2 extends Actor {
def receive = { def receive = {
case "ping" case "ping"
self.reply("got ping") reply("got ping")
case "ping2" case "ping2"
self.reply("got ping") reply("got ping")
} }
} }
} }

View file

@ -33,7 +33,7 @@ object ActorRestartSpec {
def receive = { def receive = {
case x: Int xx = x case x: Int xx = x
case t: RestartType restart = t case t: RestartType restart = t
case "get" self reply xx case "get" reply(xx)
} }
override def preStart { testActor ! (("preStart", gen)) } override def preStart { testActor ! (("preStart", gen)) }
override def preRestart(cause: Throwable, msg: Option[Any]) { testActor ! (("preRestart", msg, gen)) } override def preRestart(cause: Throwable, msg: Option[Any]) { testActor ! (("preRestart", msg, gen)) }

View file

@ -44,7 +44,7 @@ object Chameneos {
case Exit case Exit
colour = FADED colour = FADED
self.sender.get ! MeetingCount(meetings) sender.get ! MeetingCount(meetings)
} }
def complement(otherColour: Colour): Colour = colour match { def complement(otherColour: Colour): Colour = colour match {
@ -97,11 +97,11 @@ object Chameneos {
n -= 1 n -= 1
chameneo ! msg chameneo ! msg
waitingChameneo = None waitingChameneo = None
case None waitingChameneo = self.sender case None waitingChameneo = sender
} }
} else { } else {
waitingChameneo.foreach(_ ! Exit) waitingChameneo.foreach(_ ! Exit)
self.channel ! Exit channel ! Exit
} }
} }
} }

View file

@ -18,7 +18,7 @@ object ForwardActorSpec {
def createForwardingChain(): ActorRef = { def createForwardingChain(): ActorRef = {
val replier = actorOf(new Actor { val replier = actorOf(new Actor {
def receive = { case x self reply x } def receive = { case x reply(x) }
}) })
def mkforwarder(forwardTo: ActorRef) = actorOf( def mkforwarder(forwardTo: ActorRef) = actorOf(

View file

@ -48,7 +48,7 @@ object IOActorSpec {
def receiveIO = { def receiveIO = {
case length: Int case length: Int
val bytes = socket.read(length) val bytes = socket.read(length)
self reply bytes reply(bytes)
} }
} }
} }
@ -107,9 +107,9 @@ object IOActorSpec {
case msg: NewClient createWorker forward msg case msg: NewClient createWorker forward msg
case ('set, key: String, value: ByteString) case ('set, key: String, value: ByteString)
kvs += (key -> value) kvs += (key -> value)
self tryReply (()) tryReply(())
case ('get, key: String) self tryReply kvs.get(key) case ('get, key: String) tryReply(kvs.get(key))
case 'getall self tryReply kvs case 'getall tryReply(kvs)
} }
} }
@ -125,15 +125,15 @@ object IOActorSpec {
def receiveIO = { def receiveIO = {
case ('set, key: String, value: ByteString) case ('set, key: String, value: ByteString)
socket write (ByteString("SET " + key + " " + value.length + "\r\n") ++ value) socket write (ByteString("SET " + key + " " + value.length + "\r\n") ++ value)
self tryReply readResult tryReply(readResult)
case ('get, key: String) case ('get, key: String)
socket write ByteString("GET " + key + "\r\n") socket write ByteString("GET " + key + "\r\n")
self tryReply readResult tryReply(readResult)
case 'getall case 'getall
socket write ByteString("GETALL\r\n") socket write ByteString("GETALL\r\n")
self tryReply readResult tryReply(readResult)
} }
def readResult = { def readResult = {

View file

@ -75,7 +75,7 @@ class LoggingReceiveSpec
f.setBoolean(Actor, true) f.setBoolean(Actor, true)
val actor = TestActorRef(new Actor { val actor = TestActorRef(new Actor {
def receive = loggable(this) { def receive = loggable(this) {
case _ self reply "x" case _ reply("x")
} }
}) })
actor ! "buh" actor ! "buh"
@ -102,7 +102,7 @@ class LoggingReceiveSpec
f.setBoolean(Actor, true) f.setBoolean(Actor, true)
val actor = TestActorRef(new Actor { val actor = TestActorRef(new Actor {
def receive = loggable(this)(loggable(this) { def receive = loggable(this)(loggable(this) {
case _ self reply "x" case _ reply("x")
}) })
}) })
actor ! "buh" actor ! "buh"

View file

@ -22,7 +22,7 @@ class ReceiveTimeoutSpec extends WordSpec with MustMatchers {
val timeoutLatch = TestLatch() val timeoutLatch = TestLatch()
val timeoutActor = actorOf(new Actor { val timeoutActor = actorOf(new Actor {
self.receiveTimeout = Some(500L) receiveTimeout = Some(500L)
protected def receive = { protected def receive = {
case ReceiveTimeout timeoutLatch.open case ReceiveTimeout timeoutLatch.open
@ -37,7 +37,7 @@ class ReceiveTimeoutSpec extends WordSpec with MustMatchers {
val timeoutLatch = TestLatch() val timeoutLatch = TestLatch()
val timeoutActor = actorOf(new Actor { val timeoutActor = actorOf(new Actor {
self.receiveTimeout = Some(500L) receiveTimeout = Some(500L)
protected def receive = { protected def receive = {
case ReceiveTimeout timeoutLatch.open case ReceiveTimeout timeoutLatch.open
@ -61,7 +61,7 @@ class ReceiveTimeoutSpec extends WordSpec with MustMatchers {
case object Tick case object Tick
val timeoutActor = actorOf(new Actor { val timeoutActor = actorOf(new Actor {
self.receiveTimeout = Some(500L) receiveTimeout = Some(500L)
protected def receive = { protected def receive = {
case Tick () case Tick ()
@ -81,14 +81,14 @@ class ReceiveTimeoutSpec extends WordSpec with MustMatchers {
case object Tick case object Tick
val timeoutActor = actorOf(new Actor { val timeoutActor = actorOf(new Actor {
self.receiveTimeout = Some(500L) receiveTimeout = Some(500L)
protected def receive = { protected def receive = {
case Tick () case Tick ()
case ReceiveTimeout case ReceiveTimeout
count.incrementAndGet count.incrementAndGet
timeoutLatch.open timeoutLatch.open
self.receiveTimeout = None receiveTimeout = None
} }
}) })

View file

@ -45,7 +45,7 @@ object SupervisorSpec {
def receive = { def receive = {
case Ping case Ping
messageLog.put(PingMessage) messageLog.put(PingMessage)
self.tryReply(PongMessage) tryReply(PongMessage)
case Die case Die
throw new RuntimeException(ExceptionMessage) throw new RuntimeException(ExceptionMessage)
} }
@ -366,7 +366,7 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
if (inits.get % 2 == 0) throw new IllegalStateException("Don't wanna!") if (inits.get % 2 == 0) throw new IllegalStateException("Don't wanna!")
def receive = { def receive = {
case Ping self.tryReply(PongMessage) case Ping tryReply(PongMessage)
case Die throw new RuntimeException("Expected") case Die throw new RuntimeException("Expected")
} }
})) }))

View file

@ -69,11 +69,11 @@ object Ticket669Spec {
} }
override def preRestart(reason: scala.Throwable, msg: Option[Any]) { override def preRestart(reason: scala.Throwable, msg: Option[Any]) {
self.tryReply("failure1") tryReply("failure1")
} }
override def postStop() { override def postStop() {
self.tryReply("failure2") tryReply("failure2")
} }
} }
} }

View file

@ -52,18 +52,18 @@ object ActorModelSpec {
class DispatcherActor extends Actor { class DispatcherActor extends Actor {
private val busy = new Switch(false) private val busy = new Switch(false)
def dispatcher = self.dispatcher.asInstanceOf[MessageDispatcherInterceptor] def interceptor = dispatcher.asInstanceOf[MessageDispatcherInterceptor]
def ack { def ack {
if (!busy.switchOn()) { if (!busy.switchOn()) {
throw new Exception("isolation violated") throw new Exception("isolation violated")
} else { } else {
dispatcher.getStats(self).msgsProcessed.incrementAndGet() interceptor.getStats(self).msgsProcessed.incrementAndGet()
} }
} }
override def postRestart(reason: Throwable) { override def postRestart(reason: Throwable) {
dispatcher.getStats(self).restarts.incrementAndGet() interceptor.getStats(self).restarts.incrementAndGet()
} }
def receive = { def receive = {
@ -71,8 +71,8 @@ object ActorModelSpec {
case Meet(sign, wait) ack; sign.countDown(); wait.await(); busy.switchOff() case Meet(sign, wait) ack; sign.countDown(); wait.await(); busy.switchOff()
case Wait(time) ack; Thread.sleep(time); busy.switchOff() case Wait(time) ack; Thread.sleep(time); busy.switchOff()
case WaitAck(time, l) ack; Thread.sleep(time); l.countDown(); busy.switchOff() case WaitAck(time, l) ack; Thread.sleep(time); l.countDown(); busy.switchOff()
case Reply(msg) ack; self.reply(msg); busy.switchOff() case Reply(msg) ack; reply(msg); busy.switchOff()
case TryReply(msg) ack; self.tryReply(msg); busy.switchOff() case TryReply(msg) ack; tryReply(msg); busy.switchOff()
case Forward(to, msg) ack; to.forward(msg); busy.switchOff() case Forward(to, msg) ack; to.forward(msg); busy.switchOff()
case CountDown(latch) ack; latch.countDown(); busy.switchOff() case CountDown(latch) ack; latch.countDown(); busy.switchOff()
case Increment(count) ack; count.incrementAndGet(); busy.switchOff() case Increment(count) ack; count.incrementAndGet(); busy.switchOff()
@ -184,7 +184,7 @@ object ActorModelSpec {
msgsReceived: Long = statsFor(actorRef).msgsReceived.get(), msgsReceived: Long = statsFor(actorRef).msgsReceived.get(),
msgsProcessed: Long = statsFor(actorRef).msgsProcessed.get(), msgsProcessed: Long = statsFor(actorRef).msgsProcessed.get(),
restarts: Long = statsFor(actorRef).restarts.get()) { restarts: Long = statsFor(actorRef).restarts.get()) {
val stats = statsFor(actorRef, Option(dispatcher).getOrElse(actorRef.asInstanceOf[SelfActorRef].dispatcher)) val stats = statsFor(actorRef, Option(dispatcher).getOrElse(actorRef.asInstanceOf[LocalActorRef].underlying.dispatcher))
assert(stats.suspensions.get() === suspensions, "Suspensions") assert(stats.suspensions.get() === suspensions, "Suspensions")
assert(stats.resumes.get() === resumes, "Resumes") assert(stats.resumes.get() === resumes, "Resumes")
assert(stats.registers.get() === registers, "Registers") assert(stats.registers.get() === registers, "Registers")

View file

@ -12,7 +12,7 @@ import akka.actor.{ Props, Actor }
object DispatcherActorSpec { object DispatcherActorSpec {
class TestActor extends Actor { class TestActor extends Actor {
def receive = { def receive = {
case "Hello" self.reply("World") case "Hello" reply("World")
case "Failure" throw new RuntimeException("Expected exception; to test fault-tolerance") case "Failure" throw new RuntimeException("Expected exception; to test fault-tolerance")
} }
} }

View file

@ -15,7 +15,7 @@ import akka.actor.{ Props, Actor }
object PinnedActorSpec { object PinnedActorSpec {
class TestActor extends Actor { class TestActor extends Actor {
def receive = { def receive = {
case "Hello" self.reply("World") case "Hello" reply("World")
case "Failure" throw new RuntimeException("Expected exception; to test fault-tolerance") case "Failure" throw new RuntimeException("Expected exception; to test fault-tolerance")
} }
} }

View file

@ -21,7 +21,7 @@ object FutureSpec {
class TestActor extends Actor { class TestActor extends Actor {
def receive = { def receive = {
case "Hello" case "Hello"
self.reply("World") reply("World")
case "NoReply" {} case "NoReply" {}
case "Failure" case "Failure"
throw new RuntimeException("Expected exception; to test fault-tolerance") throw new RuntimeException("Expected exception; to test fault-tolerance")
@ -32,7 +32,7 @@ object FutureSpec {
def receive = { def receive = {
case "Hello" case "Hello"
await.await await.await
self.reply("World") reply("World")
case "NoReply" { await.await } case "NoReply" { await.await }
case "Failure" case "Failure"
await.await await.await
@ -143,7 +143,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
"will return a result" must { "will return a result" must {
behave like futureWithResult { test behave like futureWithResult { test
val actor1 = actorOf[TestActor] val actor1 = actorOf[TestActor]
val actor2 = actorOf(new Actor { def receive = { case s: String self reply s.toUpperCase } }) val actor2 = actorOf(new Actor { def receive = { case s: String reply(s.toUpperCase) } })
val future = actor1 ? "Hello" flatMap { case s: String actor2 ? s } val future = actor1 ? "Hello" flatMap { case s: String actor2 ? s }
future.await future.await
test(future, "WORLD") test(future, "WORLD")
@ -155,7 +155,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
behave like futureWithException[ArithmeticException] { test behave like futureWithException[ArithmeticException] { test
filterException[ArithmeticException] { filterException[ArithmeticException] {
val actor1 = actorOf[TestActor] val actor1 = actorOf[TestActor]
val actor2 = actorOf(new Actor { def receive = { case s: String self reply (s.length / 0) } }) val actor2 = actorOf(new Actor { def receive = { case s: String reply(s.length / 0) } })
val future = actor1 ? "Hello" flatMap { case s: String actor2 ? s } val future = actor1 ? "Hello" flatMap { case s: String actor2 ? s }
future.await future.await
test(future, "/ by zero") test(future, "/ by zero")
@ -168,7 +168,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
behave like futureWithException[MatchError] { test behave like futureWithException[MatchError] { test
filterException[MatchError] { filterException[MatchError] {
val actor1 = actorOf[TestActor] val actor1 = actorOf[TestActor]
val actor2 = actorOf(new Actor { def receive = { case s: String self reply s.toUpperCase } }) val actor2 = actorOf(new Actor { def receive = { case s: String reply(s.toUpperCase) } })
val future = actor1 ? "Hello" flatMap { case i: Int actor2 ? i } val future = actor1 ? "Hello" flatMap { case i: Int actor2 ? i }
future.await future.await
test(future, "World (of class java.lang.String)") test(future, "World (of class java.lang.String)")
@ -185,8 +185,8 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
filterException[ClassCastException] { filterException[ClassCastException] {
val actor = actorOf(new Actor { val actor = actorOf(new Actor {
def receive = { def receive = {
case s: String self reply s.length case s: String reply(s.length)
case i: Int self reply (i * 2).toString case i: Int reply((i * 2).toString)
} }
}) })
@ -217,8 +217,8 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
case class Res[T](res: T) case class Res[T](res: T)
val actor = actorOf(new Actor { val actor = actorOf(new Actor {
def receive = { def receive = {
case Req(s: String) self reply Res(s.length) case Req(s: String) reply(Res(s.length))
case Req(i: Int) self reply Res((i * 2).toString) case Req(i: Int) reply(Res((i * 2).toString))
} }
}) })
@ -304,7 +304,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
"fold" in { "fold" in {
val actors = (1 to 10).toList map { _ val actors = (1 to 10).toList map { _
actorOf(new Actor { actorOf(new Actor {
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); self tryReply add } def receive = { case (add: Int, wait: Int) Thread.sleep(wait); tryReply(add) }
}) })
} }
val timeout = 10000 val timeout = 10000
@ -315,7 +315,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
"fold by composing" in { "fold by composing" in {
val actors = (1 to 10).toList map { _ val actors = (1 to 10).toList map { _
actorOf(new Actor { actorOf(new Actor {
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); self tryReply add } def receive = { case (add: Int, wait: Int) Thread.sleep(wait); tryReply(add) }
}) })
} }
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200), 10000).mapTo[Int] } def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200), 10000).mapTo[Int] }
@ -330,7 +330,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
case (add: Int, wait: Int) case (add: Int, wait: Int)
Thread.sleep(wait) Thread.sleep(wait)
if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected") if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
self tryReply add tryReply(add)
} }
}) })
} }
@ -362,7 +362,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
"shouldReduceResults" in { "shouldReduceResults" in {
val actors = (1 to 10).toList map { _ val actors = (1 to 10).toList map { _
actorOf(new Actor { actorOf(new Actor {
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); self tryReply add } def receive = { case (add: Int, wait: Int) Thread.sleep(wait); tryReply(add) }
}) })
} }
val timeout = 10000 val timeout = 10000
@ -378,7 +378,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
case (add: Int, wait: Int) case (add: Int, wait: Int)
Thread.sleep(wait) Thread.sleep(wait)
if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected") if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
self tryReply add tryReply(add)
} }
}) })
} }
@ -407,7 +407,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
var counter = 1 var counter = 1
def receive = { def receive = {
case 'GetNext case 'GetNext
self reply counter reply(counter)
counter += 2 counter += 2
} }
}) })

View file

@ -31,7 +31,7 @@ class PriorityDispatcherSpec extends WordSpec with MustMatchers {
def receive = { def receive = {
case i: Int acc = i :: acc case i: Int acc = i :: acc
case 'Result self tryReply acc case 'Result tryReply(acc)
} }
}).withDispatcher(dispatcher)).asInstanceOf[LocalActorRef] }).withDispatcher(dispatcher)).asInstanceOf[LocalActorRef]

View file

@ -48,7 +48,7 @@ class AkkaMatchingEngine(val meId: String, val orderbooks: List[Orderbook])
} }
def done(status: Boolean) { def done(status: Boolean) {
self.channel ! new Rsp(status) channel ! new Rsp(status)
} }
def waitForStandby(pendingStandbyFuture: Future[_]) { def waitForStandby(pendingStandbyFuture: Future[_]) {

View file

@ -42,7 +42,7 @@ class AkkaOrderReceiver extends Actor with OrderReceiver {
m.forward(order) m.forward(order)
case None case None
EventHandler.warning(this, "Unknown orderbook: " + order.orderbookSymbol) EventHandler.warning(this, "Unknown orderbook: " + order.orderbookSymbol)
self.channel ! new Rsp(false) channel ! new Rsp(false)
} }
} }
} }

View file

@ -43,7 +43,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers {
case _ case _
count.incrementAndGet count.incrementAndGet
latch.countDown() latch.countDown()
self tryReply "success" tryReply("success")
} }
}) })
@ -92,7 +92,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers {
def receive = { def receive = {
case req: String { case req: String {
sleepFor(10 millis) sleepFor(10 millis)
self.tryReply("Response") tryReply("Response")
} }
} }
}) })

View file

@ -179,7 +179,7 @@ class Ticket1111Spec extends WordSpec with MustMatchers {
case Stop(None) self.stop(); shudownLatch.map(_.countDown()) case Stop(None) self.stop(); shudownLatch.map(_.countDown())
case Stop(Some(_id)) if (_id == id) self.stop(); shudownLatch.map(_.countDown()) case Stop(Some(_id)) if (_id == id) self.stop(); shudownLatch.map(_.countDown())
case _id: Int if (_id == id) case _id: Int if (_id == id)
case _ Thread sleep 100 * id; self tryReply id case _ Thread sleep 100 * id; tryReply(id)
} }
}) })

View file

@ -25,7 +25,7 @@ class Ticket703Spec extends WordSpec with MustMatchers {
def receive = { def receive = {
case req: String case req: String
Thread.sleep(6000L) Thread.sleep(6000L)
self.tryReply("Response") tryReply("Response")
} }
}) })
}).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Exception]), 5, 1000))) }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Exception]), 5, 1000)))

View file

@ -25,6 +25,7 @@ import com.eaio.uuid.UUID
import java.lang.reflect.InvocationTargetException import java.lang.reflect.InvocationTargetException
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.{ Collection JCollection }
/** /**
* Marker trait to show which Messages are automatically handled by Akka * Marker trait to show which Messages are automatically handled by Akka
@ -36,7 +37,7 @@ sealed trait AutoReceivedMessage extends Serializable
*/ */
sealed trait LifeCycleMessage extends Serializable { self: AutoReceivedMessage } sealed trait LifeCycleMessage extends Serializable { self: AutoReceivedMessage }
case class HotSwap(code: SelfActorRef Actor.Receive, discardOld: Boolean = true) extends AutoReceivedMessage { case class HotSwap(code: ActorRef Actor.Receive, discardOld: Boolean = true) extends AutoReceivedMessage {
/** /**
* Java API * Java API
@ -403,43 +404,12 @@ object Actor {
* <p/> * <p/>
* An actor has a well-defined (non-cyclic) life-cycle. * An actor has a well-defined (non-cyclic) life-cycle.
* <pre> * <pre>
* => NEW (newly created actor) - can't receive messages (yet) * => RUNNING (created and started actor) - can receive messages
* => STARTED (when 'start' is invoked) - can receive messages * => SHUTDOWN (when 'stop' or 'exit' is invoked) - can't do anything
* => SHUT DOWN (when 'exit' is invoked) - can't do anything
* </pre> * </pre>
* *
* <p/> * <p/>
* The Actor's API is available in the 'self' member variable. * The Actor's own ActorRef is available in the 'self' member variable.
*
* <p/>
* Here you find functions like:
* - !, ? and forward
* - link, unlink etc
* - start, stop
* - etc.
*
* <p/>
* Here you also find fields like
* - dispatcher = ...
* - id = ...
* - lifeCycle = ...
* - faultHandler = ...
* - trapExit = ...
* - etc.
*
* <p/>
* This means that to use them you have to prefix them with 'self', like this: <tt>self ! Message</tt>
*
* However, for convenience you can import these functions and fields like below, which will allow you do
* drop the 'self' prefix:
* <pre>
* class MyActor extends Actor {
* import self._
* id = ...
* dispatcher = ...
* ...
* }
* </pre>
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
@ -456,7 +426,7 @@ trait Actor {
* Stores the context for this actor, including self, sender, and hotswap. * Stores the context for this actor, including self, sender, and hotswap.
*/ */
@transient @transient
private[akka] val actorContext: ActorContext = { private[akka] val context: ActorContext = {
val contextStack = ActorInstance.contextStack.get val contextStack = ActorInstance.contextStack.get
def noContextError = { def noContextError = {
@ -480,7 +450,7 @@ trait Actor {
* Mainly for internal use, functions as the implicit sender references when invoking * Mainly for internal use, functions as the implicit sender references when invoking
* the 'forward' function. * the 'forward' function.
*/ */
def someSelf: Some[ScalaActorRef with SelfActorRef] = Some(actorContext.self) def someSelf: Some[ActorRef with ScalaActorRef] = Some(context.self)
/* /*
* Option[ActorRef] representation of the 'self' ActorRef reference. * Option[ActorRef] representation of the 'self' ActorRef reference.
@ -488,7 +458,7 @@ trait Actor {
* Mainly for internal use, functions as the implicit sender references when invoking * Mainly for internal use, functions as the implicit sender references when invoking
* one of the message send functions ('!' and '?'). * one of the message send functions ('!' and '?').
*/ */
def optionSelf: Option[ScalaActorRef with SelfActorRef] = someSelf def optionSelf: Option[ActorRef with ScalaActorRef] = someSelf
/** /**
* The 'self' field holds the ActorRef for this actor. * The 'self' field holds the ActorRef for this actor.
@ -497,27 +467,79 @@ trait Actor {
* <pre> * <pre>
* self ! message * self ! message
* </pre> * </pre>
* Here you also find most of the Actor API.
* <p/>
* For example fields like:
* <pre>
* self.dispatcher = ...
* self.faultHandler = ...
* self.lifeCycle = ...
* self.sender
* </pre>
* <p/>
* Here you also find methods like:
* <pre>
* self.reply(..)
* self.link(..)
* self.unlink(..)
* self.stop(..)
* </pre>
*/ */
@transient
implicit def self = someSelf.get implicit def self = someSelf.get
/**
* The reference sender Actor of the last received message.
* Is defined if the message was sent from another Actor, else None.
*/
def sender: Option[ActorRef] = context.sender
/**
* The reference sender future of the last received message.
* Is defined if the message was sent with sent with '?'/'ask', else None.
*/
def senderFuture(): Option[Promise[Any]] = context.senderFuture
/**
* Abstraction for unification of sender and senderFuture for later reply
*/
def channel: UntypedChannel = context.channel
// just for current compatibility
implicit def forwardable: ForwardableChannel = ForwardableChannel(channel)
/**
* Gets the current receive timeout
* When specified, the receive method should be able to handle a 'ReceiveTimeout' message.
*/
def receiveTimeout: Option[Long] = context.receiveTimeout
/**
* User overridable callback/setting.
* <p/>
* Defines the default timeout for an initial receive invocation.
* When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
*/
def receiveTimeout_=(timeout: Option[Long]) = context.receiveTimeout = timeout
/**
* Akka Scala & Java API
* Use <code>reply(..)</code> to reply with a message to the original sender of the message currently
* being processed. This method fails if the original sender of the message could not be determined with an
* IllegalStateException.
*
* If you don't want deal with this IllegalStateException, but just a boolean, just use the <code>tryReply(...)</code>
* version.
*
* <p/>
* Throws an IllegalStateException if unable to determine what to reply to.
*/
def reply(message: Any) = channel.!(message)(self)
/**
* Akka Scala & Java API
* Use <code>tryReply(..)</code> to try reply with a message to the original sender of the message currently
* being processed. This method
* <p/>
* Returns true if reply was sent, and false if unable to determine what to reply to.
*
* If you would rather have an exception, check the <code>reply(..)</code> version.
*/
def tryReply(message: Any): Boolean = channel.tryTell(message)(self)
/**
* Returns an unmodifiable Java Collection containing the linked actors,
* please note that the backing map is thread-safe but not immutable
*/
def linkedActors: JCollection[ActorRef] = context.linkedActors
/**
* Returns the dispatcher (MessageDispatcher) that is used for this Actor
*/
def dispatcher: MessageDispatcher = context.dispatcher
/** /**
* User overridable callback/setting. * User overridable callback/setting.
* <p/> * <p/>
@ -529,7 +551,7 @@ trait Actor {
* def receive = { * def receive = {
* case Ping =&gt; * case Ping =&gt;
* println("got a 'Ping' message") * println("got a 'Ping' message")
* self.reply("pong") * reply("pong")
* *
* case OneWay =&gt; * case OneWay =&gt;
* println("got a 'OneWay' message") * println("got a 'OneWay' message")
@ -601,15 +623,15 @@ trait Actor {
*/ */
def become(behavior: Receive, discardOld: Boolean = true) { def become(behavior: Receive, discardOld: Boolean = true) {
if (discardOld) unbecome() if (discardOld) unbecome()
self.hotswap = self.hotswap.push(behavior) context.hotswap = context.hotswap.push(behavior)
} }
/** /**
* Reverts the Actor behavior to the previous one in the hotswap stack. * Reverts the Actor behavior to the previous one in the hotswap stack.
*/ */
def unbecome() { def unbecome() {
val h = self.hotswap val h = context.hotswap
if (h.nonEmpty) self.hotswap = h.pop if (h.nonEmpty) context.hotswap = h.pop
} }
// ========================================= // =========================================
@ -618,7 +640,7 @@ trait Actor {
private[akka] final def apply(msg: Any) = { private[akka] final def apply(msg: Any) = {
if (msg.isInstanceOf[AnyRef] && (msg.asInstanceOf[AnyRef] eq null)) if (msg.isInstanceOf[AnyRef] && (msg.asInstanceOf[AnyRef] eq null))
throw new InvalidMessageException("Message from [" + self.channel + "] to [" + self.toString + "] is null") throw new InvalidMessageException("Message from [" + channel + "] to [" + self.toString + "] is null")
def autoReceiveMessage(msg: AutoReceivedMessage): Boolean = { def autoReceiveMessage(msg: AutoReceivedMessage): Boolean = {
if (debugAutoReceive) EventHandler.debug(this, "received AutoReceiveMessage " + msg) if (debugAutoReceive) EventHandler.debug(this, "received AutoReceiveMessage " + msg)
@ -635,17 +657,17 @@ trait Actor {
*/ */
msg match { msg match {
case Init self.reply(()); false //All gud nao FIXME remove reply when we can have fully async init case Init reply(()); false //All gud nao FIXME remove reply when we can have fully async init
case HotSwap(code, discardOld) become(code(self), discardOld); false case HotSwap(code, discardOld) become(code(self), discardOld); false
case RevertHotSwap unbecome(); false case RevertHotSwap unbecome(); false
case d: Death self.handleDeath(d); false case d: Death context.handleDeath(d); false
case Link(child) self.link(child); false case Link(child) self.link(child); false
case Unlink(child) self.unlink(child); false case Unlink(child) self.unlink(child); false
case UnlinkAndStop(child) self.unlink(child); child.stop(); false case UnlinkAndStop(child) self.unlink(child); child.stop(); false
case Restart(reason) throw reason case Restart(reason) throw reason
case Kill throw new ActorKilledException("Kill") case Kill throw new ActorKilledException("Kill")
case PoisonPill case PoisonPill
val ch = self.channel val ch = channel
self.stop() self.stop()
ch.sendException(new ActorKilledException("PoisonPill")) ch.sendException(new ActorKilledException("PoisonPill"))
false false
@ -655,7 +677,7 @@ trait Actor {
if (msg.isInstanceOf[AutoReceivedMessage]) if (msg.isInstanceOf[AutoReceivedMessage])
autoReceiveMessage(msg.asInstanceOf[AutoReceivedMessage]) autoReceiveMessage(msg.asInstanceOf[AutoReceivedMessage])
else { else {
val behaviorStack = self.hotswap val behaviorStack = context.hotswap
msg match { msg match {
case msg if behaviorStack.nonEmpty && behaviorStack.head.isDefinedAt(msg) behaviorStack.head.apply(msg) case msg if behaviorStack.nonEmpty && behaviorStack.head.isDefinedAt(msg) behaviorStack.head.apply(msg)
case msg if behaviorStack.isEmpty && processingBehavior.isDefinedAt(msg) processingBehavior.apply(msg) case msg if behaviorStack.isEmpty && processingBehavior.isDefinedAt(msg) processingBehavior.apply(msg)

View file

@ -1,13 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
/**
* Everything that gets injected into the actor.
* Just a wrapper on self for now.
*/
private[akka] class ActorContext(val self: LocalActorRef) {
}

View file

@ -14,6 +14,40 @@ import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable.Stack import scala.collection.immutable.Stack
/**
* The actor context - the view into the actor instance from the actor.
* Exposes contextual information for the actor and the current message.
* TODO: everything here for current compatibility - could be limited more
*/
private[akka] trait ActorContext {
def self: ActorRef with ScalaActorRef
def receiveTimeout: Option[Long]
def receiveTimeout_=(timeout: Option[Long]): Unit
def hotswap: Stack[PartialFunction[Any, Unit]]
def hotswap_=(stack: Stack[PartialFunction[Any, Unit]]): Unit
def currentMessage: MessageInvocation
def currentMessage_=(invocation: MessageInvocation): Unit
def sender: Option[ActorRef]
def senderFuture(): Option[Promise[Any]]
def channel: UntypedChannel
def linkedActors: JCollection[ActorRef]
def dispatcher: MessageDispatcher
def handleDeath(death: Death)
}
private[akka] object ActorInstance { private[akka] object ActorInstance {
sealed trait Status sealed trait Status
object Status { object Status {
@ -26,7 +60,13 @@ private[akka] object ActorInstance {
} }
} }
private[akka] class ActorInstance(props: Props, self: LocalActorRef) { private[akka] class ActorInstance(
val self: ActorRef with ScalaActorRef,
props: Props,
_receiveTimeout: Option[Long],
_hotswap: Stack[PartialFunction[Any, Unit]])
extends ActorContext {
import ActorInstance._ import ActorInstance._
val guard = new ReentrantGuard // TODO: remove this last synchronization point val guard = new ReentrantGuard // TODO: remove this last synchronization point
@ -52,9 +92,18 @@ private[akka] class ActorInstance(props: Props, self: LocalActorRef) {
@volatile @volatile
lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef] lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef]
@volatile
var hotswap: Stack[PartialFunction[Any, Unit]] = _hotswap // TODO: currently settable from outside for compatibility
@volatile
var receiveTimeout: Option[Long] = _receiveTimeout // TODO: currently settable from outside for compatibility
@volatile
var currentMessage: MessageInvocation = null
val actor: AtomicReference[Actor] = new AtomicReference[Actor]() val actor: AtomicReference[Actor] = new AtomicReference[Actor]()
def ref: ActorRef = self def ref: ActorRef with ScalaActorRef = self
def uuid: Uuid = self.uuid def uuid: Uuid = self.uuid
@ -74,7 +123,7 @@ private[akka] class ActorInstance(props: Props, self: LocalActorRef) {
def newActor(restart: Boolean): Actor = { def newActor(restart: Boolean): Actor = {
val stackBefore = contextStack.get val stackBefore = contextStack.get
contextStack.set(stackBefore.push(new ActorContext(self))) contextStack.set(stackBefore.push(this))
try { try {
if (restart) { if (restart) {
val a = actor.get() val a = actor.get()
@ -106,7 +155,7 @@ private[akka] class ActorInstance(props: Props, self: LocalActorRef) {
private[akka] def stop(): Unit = guard.withGuard { private[akka] def stop(): Unit = guard.withGuard {
if (isRunning) { if (isRunning) {
self.receiveTimeout = None receiveTimeout = None
cancelReceiveTimeout cancelReceiveTimeout
Actor.registry.unregister(self) Actor.registry.unregister(self)
status = Status.Shutdown status = Status.Shutdown
@ -126,7 +175,7 @@ private[akka] class ActorInstance(props: Props, self: LocalActorRef) {
} finally { } finally {
//if (supervisor.isDefined) supervisor.get ! Death(self, new ActorKilledException("Stopped"), false) //if (supervisor.isDefined) supervisor.get ! Death(self, new ActorKilledException("Stopped"), false)
self.currentMessage = null currentMessage = null
clearActorContext() clearActorContext()
} }
} }
@ -164,24 +213,6 @@ private[akka] class ActorInstance(props: Props, self: LocalActorRef) {
def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup
def sender: Option[ActorRef] = {
val msg = self.currentMessage
if (msg eq null) None
else msg.channel match {
case ref: ActorRef Some(ref)
case _ None
}
}
def senderFuture(): Option[Promise[Any]] = {
val msg = self.currentMessage
if (msg eq null) None
else msg.channel match {
case f: ActorPromise Some(f)
case _ None
}
}
def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit =
if (isRunning) dispatcher dispatchMessage new MessageInvocation(this, message, channel) if (isRunning) dispatcher dispatchMessage new MessageInvocation(this, message, channel)
else throw new ActorInitializationException("Actor " + self + " is dead") else throw new ActorInitializationException("Actor " + self + " is dead")
@ -198,11 +229,34 @@ private[akka] class ActorInstance(props: Props, self: LocalActorRef) {
future future
} else throw new ActorInitializationException("Actor " + self + " is dead") } else throw new ActorInitializationException("Actor " + self + " is dead")
def sender: Option[ActorRef] = {
val msg = currentMessage
if (msg eq null) None
else msg.channel match {
case ref: ActorRef Some(ref)
case _ None
}
}
def senderFuture(): Option[Promise[Any]] = {
val msg = currentMessage
if (msg eq null) None
else msg.channel match {
case f: ActorPromise Some(f)
case _ None
}
}
def channel: UntypedChannel = currentMessage match {
case null NullChannel
case msg msg.channel
}
def invoke(messageHandle: MessageInvocation): Unit = { def invoke(messageHandle: MessageInvocation): Unit = {
guard.lock.lock() guard.lock.lock()
try { try {
if (!isShutdown) { if (!isShutdown) {
self.currentMessage = messageHandle currentMessage = messageHandle
try { try {
try { try {
cancelReceiveTimeout() // FIXME: leave this here? cancelReceiveTimeout() // FIXME: leave this here?
@ -218,7 +272,7 @@ private[akka] class ActorInstance(props: Props, self: LocalActorRef) {
} }
a.apply(messageHandle.message) a.apply(messageHandle.message)
self.currentMessage = null // reset current message after successful invocation currentMessage = null // reset current message after successful invocation
} catch { } catch {
case e case e
EventHandler.error(e, self, e.getMessage) EventHandler.error(e, self, e.getMessage)
@ -226,7 +280,7 @@ private[akka] class ActorInstance(props: Props, self: LocalActorRef) {
// prevent any further messages to be processed until the actor has been restarted // prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this) dispatcher.suspend(this)
self.channel.sendException(e) channel.sendException(e)
if (supervisor.isDefined) supervisor.get ! Death(self, e, true) else dispatcher.resume(this) if (supervisor.isDefined) supervisor.get ! Death(self, e, true) else dispatcher.resume(this)
@ -271,7 +325,7 @@ private[akka] class ActorInstance(props: Props, self: LocalActorRef) {
def performRestart() { def performRestart() {
val failedActor = actor.get val failedActor = actor.get
if (Actor.debugLifecycle) EventHandler.debug(failedActor, "restarting") if (Actor.debugLifecycle) EventHandler.debug(failedActor, "restarting")
val message = if (self.currentMessage ne null) Some(self.currentMessage.message) else None val message = if (currentMessage ne null) Some(currentMessage.message) else None
if (failedActor ne null) failedActor.preRestart(reason, message) if (failedActor ne null) failedActor.preRestart(reason, message)
val freshActor = newActor(restart = true) val freshActor = newActor(restart = true)
clearActorContext() clearActorContext()
@ -293,7 +347,7 @@ private[akka] class ActorInstance(props: Props, self: LocalActorRef) {
EventHandler.error(e, self, "Exception in restart of Actor [%s]".format(toString)) EventHandler.error(e, self, "Exception in restart of Actor [%s]".format(toString))
false // an error or exception here should trigger a retry false // an error or exception here should trigger a retry
} finally { } finally {
self.currentMessage = null currentMessage = null
} }
if (success) { if (success) {
@ -374,7 +428,7 @@ private[akka] class ActorInstance(props: Props, self: LocalActorRef) {
def checkReceiveTimeout() { def checkReceiveTimeout() {
cancelReceiveTimeout() cancelReceiveTimeout()
val recvtimeout = self.receiveTimeout val recvtimeout = receiveTimeout
if (recvtimeout.isDefined && dispatcher.mailboxIsEmpty(this)) { if (recvtimeout.isDefined && dispatcher.mailboxIsEmpty(this)) {
//Only reschedule if desired and there are currently no more messages to be processed //Only reschedule if desired and there are currently no more messages to be processed
futureTimeout = Some(Scheduler.scheduleOnce(self, ReceiveTimeout, recvtimeout.get, TimeUnit.MILLISECONDS)) futureTimeout = Some(Scheduler.scheduleOnce(self, ReceiveTimeout, recvtimeout.get, TimeUnit.MILLISECONDS))
@ -394,7 +448,7 @@ private[akka] class ActorInstance(props: Props, self: LocalActorRef) {
@tailrec @tailrec
def lookupAndSetSelfFields(clazz: Class[_], actor: Actor, newContext: ActorContext): Boolean = { def lookupAndSetSelfFields(clazz: Class[_], actor: Actor, newContext: ActorContext): Boolean = {
val success = try { val success = try {
val contextField = clazz.getDeclaredField("actorContext") val contextField = clazz.getDeclaredField("context")
contextField.setAccessible(true) contextField.setAccessible(true)
contextField.set(actor, newContext) contextField.set(actor, newContext)
true true
@ -413,4 +467,13 @@ private[akka] class ActorInstance(props: Props, self: LocalActorRef) {
lookupAndSetSelfFields(actor.get.getClass, actor.get, newContext) lookupAndSetSelfFields(actor.get.getClass, actor.get, newContext)
} }
override def hashCode: Int = HashCode.hash(HashCode.SEED, uuid)
override def equals(that: Any): Boolean = {
that.isInstanceOf[ActorInstance] && that.asInstanceOf[ActorInstance].uuid == uuid
}
override def toString = "ActorInstance[%s]".format(uuid)
} }

View file

@ -11,7 +11,6 @@ import akka.serialization.{ Serializer, Serialization }
import ReflectiveAccess._ import ReflectiveAccess._
import ClusterModule._ import ClusterModule._
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util.{ Collection JCollection }
import scala.collection.immutable.Stack import scala.collection.immutable.Stack
import java.lang.{ UnsupportedOperationException, IllegalStateException } import java.lang.{ UnsupportedOperationException, IllegalStateException }
@ -99,9 +98,9 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha
* Akka Java API. <p/> * Akka Java API. <p/>
* Forwards the message specified to this actor and preserves the original sender of the message * Forwards the message specified to this actor and preserves the original sender of the message
*/ */
def forward(message: AnyRef, sender: SelfActorRef) { def forward(message: AnyRef, sender: ActorRef) {
if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null") if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null")
else forward(message)(sender) else forward(message)(ForwardableChannel(sender))
} }
/** /**
@ -182,141 +181,6 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha
override def toString = "Actor[%s]".format(address) override def toString = "Actor[%s]".format(address)
} }
abstract class SelfActorRef extends ActorRef with ForwardableChannel { self: LocalActorRef with ScalaActorRef
/**
* Holds the hot swapped partial function.
* WARNING: DO NOT USE THIS, IT IS INTERNAL AKKA USE ONLY
*/
@volatile
protected[akka] var hotswap = Stack[PartialFunction[Any, Unit]]()
/**
* This is a reference to the message currently being processed by the actor
*/
@volatile
protected[akka] var currentMessage: MessageInvocation = null
/**
* User overridable callback/setting.
* <p/>
* Defines the default timeout for an initial receive invocation.
* When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
*/
@volatile
var receiveTimeout: Option[Long] = None
/**
* Akka Java API. <p/>
* The reference sender Actor of the last received message.
* Is defined if the message was sent from another Actor, else None.
*/
@deprecated("will be removed in 2.0, use channel instead", "1.2")
def getSender: Option[ActorRef] = sender
/**
* Akka Java API. <p/>
* The reference sender future of the last received message.
* Is defined if the message was sent with sent with '?'/'ask', else None.
*/
@deprecated("will be removed in 2.0, use channel instead", "1.2")
def getSenderFuture: Option[Promise[Any]] = senderFuture
/**
* The reference sender Actor of the last received message.
* Is defined if the message was sent from another Actor, else None.
*/
@deprecated("will be removed in 2.0, use channel instead", "1.2")
def sender: Option[ActorRef]
/**
* The reference sender future of the last received message.
* Is defined if the message was sent with sent with '?'/'ask', else None.
*/
@deprecated("will be removed in 2.0, use channel instead", "1.2")
def senderFuture(): Option[Promise[Any]]
/**
* Abstraction for unification of sender and senderFuture for later reply
*/
def channel: UntypedChannel = self.currentMessage match {
case null NullChannel
case msg msg.channel
}
/**
* Akka Java API. <p/>
* Defines the default timeout for an initial receive invocation.
* When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
*/
def setReceiveTimeout(timeout: Long): Unit = this.receiveTimeout = Some(timeout)
/**
* Akka Java API. <p/>
* Gets the current receive timeout
* When specified, the receive method should be able to handle a 'ReceiveTimeout' message.
*/
def getReceiveTimeout: Option[Long] = receiveTimeout
/**
* Java API. <p/>
* Abstraction for unification of sender and senderFuture for later reply
*/
def getChannel: UntypedChannel = channel
/**
* Akka Scala & Java API
* Use <code>self.reply(..)</code> to reply with a message to the original sender of the message currently
* being processed. This method fails if the original sender of the message could not be determined with an
* IllegalStateException.
*
* If you don't want deal with this IllegalStateException, but just a boolean, just use the <code>tryReply(...)</code>
* version.
*
* <p/>
* Throws an IllegalStateException if unable to determine what to reply to.
*/
def reply(message: Any) = channel.!(message)(this)
/**
* Akka Scala & Java API
* Use <code>tryReply(..)</code> to try reply with a message to the original sender of the message currently
* being processed. This method
* <p/>
* Returns true if reply was sent, and false if unable to determine what to reply to.
*
* If you would rather have an exception, check the <code>reply(..)</code> version.
*/
def tryReply(message: Any): Boolean = channel.tryTell(message)(this)
/**
* Returns an unmodifiable Java Collection containing the linked actors,
* please note that the backing map is thread-safe but not immutable
*/
def linkedActors: JCollection[ActorRef]
/**
* Java API. <p/>
* Returns an unmodifiable Java Collection containing the linked actors,
* please note that the backing map is thread-safe but not immutable
*/
def getLinkedActors: JCollection[ActorRef] = linkedActors
/**
* Scala API
* Returns the dispatcher (MessageDispatcher) that is used for this Actor
*/
def dispatcher: MessageDispatcher
/**
* Java API
* Returns the dispatcher (MessageDispatcher) that is used for this Actor
*/
final def getDispatcher(): MessageDispatcher = dispatcher
/** INTERNAL API ONLY **/
protected[akka] def handleDeath(death: Death): Unit
}
/** /**
* Local (serializable) ActorRef that is used when referencing the Actor on its "home" node. * Local (serializable) ActorRef that is used when referencing the Actor on its "home" node.
* *
@ -325,9 +189,11 @@ abstract class SelfActorRef extends ActorRef with ForwardableChannel { self: Loc
class LocalActorRef private[akka] ( class LocalActorRef private[akka] (
private[this] val props: Props, private[this] val props: Props,
val address: String, val address: String,
val systemService: Boolean, val systemService: Boolean = false,
override private[akka] val uuid: Uuid = newUuid) override private[akka] val uuid: Uuid = newUuid,
extends SelfActorRef with ScalaActorRef { receiveTimeout: Option[Long] = None,
hotswap: Stack[PartialFunction[Any, Unit]] = Stack.empty)
extends ActorRef with ScalaActorRef {
// used only for deserialization // used only for deserialization
private[akka] def this( private[akka] def this(
@ -337,15 +203,13 @@ class LocalActorRef private[akka] (
__receiveTimeout: Option[Long], __receiveTimeout: Option[Long],
__hotswap: Stack[PartialFunction[Any, Unit]]) = { __hotswap: Stack[PartialFunction[Any, Unit]]) = {
this(__props, __address, false, __uuid) //Doesn't make any sense to move a system service this(__props, __address, false, __uuid, __receiveTimeout, __hotswap)
hotswap = __hotswap actorInstance.setActorContext(actorInstance) // this is needed for deserialization - why?
receiveTimeout = __receiveTimeout
actorInstance.setActorContext(new ActorContext(this)) // this is needed for deserialization - why?
} }
private[this] val actorInstance = new ActorInstance(props, this) private[this] val actorInstance = new ActorInstance(this, props, receiveTimeout, hotswap)
actorInstance.start() //Nonsense actorInstance.start()
/** /**
* Is the actor running? * Is the actor running?
@ -357,11 +221,6 @@ class LocalActorRef private[akka] (
*/ */
def isShutdown: Boolean = actorInstance.isShutdown def isShutdown: Boolean = actorInstance.isShutdown
/**
* Returns the dispatcher (MessageDispatcher) that is used for this Actor
*/
def dispatcher: MessageDispatcher = props.dispatcher
/** /**
* Suspends the actor. It will not process messages while suspended. * Suspends the actor. It will not process messages while suspended.
*/ */
@ -398,30 +257,11 @@ class LocalActorRef private[akka] (
*/ */
def unlink(actorRef: ActorRef): ActorRef = actorInstance.unlink(actorRef) def unlink(actorRef: ActorRef): ActorRef = actorInstance.unlink(actorRef)
/**
* Returns an unmodifiable Java Collection containing the linked actors
*/
def linkedActors: JCollection[ActorRef] = actorInstance.linkedActors
/** /**
* Returns the supervisor, if there is one. * Returns the supervisor, if there is one.
*/ */
def supervisor: Option[ActorRef] = actorInstance.supervisor def supervisor: Option[ActorRef] = actorInstance.supervisor
/**
* The reference sender Actor of the last received message.
* Is defined if the message was sent from another Actor, else None.
*/
@deprecated("will be removed in 2.0, use channel instead", "1.2")
def sender: Option[ActorRef] = actorInstance.sender
/**
* The reference sender future of the last received message.
* Is defined if the message was sent with sent with '?'/'ask', else None.
*/
@deprecated("will be removed in 2.0, use channel instead", "1.2")
def senderFuture(): Option[Promise[Any]] = actorInstance.senderFuture
// ========= AKKA PROTECTED FUNCTIONS ========= // ========= AKKA PROTECTED FUNCTIONS =========
protected[akka] def actorClass: Class[_] = actorInstance.actorClass protected[akka] def actorClass: Class[_] = actorInstance.actorClass
@ -600,7 +440,7 @@ trait ScalaActorRef extends ActorRefShared with ReplyChannel[Any] { ref: ActorRe
* <p/> * <p/>
* Works with '!' and '?'/'ask'. * Works with '!' and '?'/'ask'.
*/ */
def forward(message: Any)(implicit channel: ForwardableChannel) = postMessageToMailbox(message, channel.channel) def forward(message: Any)(implicit forwardable: ForwardableChannel) = postMessageToMailbox(message, forwardable.channel)
} }
/** /**

View file

@ -141,21 +141,6 @@ case object NullChannel extends UntypedChannel {
} }
/** /**
* A channel which may be forwarded: a message received with such a reply * Wraps a forwardable channel. Used implicitly by ScalaActorRef.forward
* channel attached can be passed on transparently such that a reply from a
* later processing stage is sent directly back to the origin. Keep in mind
* that not all channels can be used multiple times.
*/ */
trait ForwardableChannel extends UntypedChannel with AvailableChannel[Any] { case class ForwardableChannel(val channel: UntypedChannel)
/**
* Get channel by which this channel would reply (ActorRef.forward takes an
* implicit ForwardableChannel and uses its .channel as message origin)
*/
def channel: UntypedChannel
}
object ForwardableChannel {
implicit def someS2FC(sender: Some[SelfActorRef]): ForwardableChannel = sender.get
implicit def someIS2FC(implicit sender: Some[SelfActorRef]): ForwardableChannel = sender.get
}

View file

@ -483,7 +483,7 @@ trait FSM[S, D] extends ListenerManagement {
timeoutFuture = None timeoutFuture = None
} }
generation += 1 generation += 1
processMsg(value, self.channel) processMsg(value, channel)
} }
} }
@ -507,7 +507,7 @@ trait FSM[S, D] extends ListenerManagement {
nextState.stopReason match { nextState.stopReason match {
case None makeTransition(nextState) case None makeTransition(nextState)
case _ case _
nextState.replies.reverse foreach (self reply _) nextState.replies.reverse foreach reply
terminate(nextState) terminate(nextState)
self.stop() self.stop()
} }
@ -517,7 +517,7 @@ trait FSM[S, D] extends ListenerManagement {
if (!stateFunctions.contains(nextState.stateName)) { if (!stateFunctions.contains(nextState.stateName)) {
terminate(stay withStopReason Failure("Next state %s does not exist".format(nextState.stateName))) terminate(stay withStopReason Failure("Next state %s does not exist".format(nextState.stateName)))
} else { } else {
nextState.replies.reverse foreach (self reply _) nextState.replies.reverse foreach reply
if (currentState.stateName != nextState.stateName) { if (currentState.stateName != nextState.stateName) {
handleTransition(currentState.stateName, nextState.stateName) handleTransition(currentState.stateName, nextState.stateName)
notifyListeners(Transition(self, currentState.stateName, nextState.stateName)) notifyListeners(Transition(self, currentState.stateName, nextState.stateName))

View file

@ -50,15 +50,15 @@ object IO {
override def asReadable = this override def asReadable = this
def read(len: Int)(implicit actor: Actor with IO): ByteString @cps[IOSuspendable[Any]] = shift { cont: (ByteString IOSuspendable[Any]) def read(len: Int)(implicit actor: Actor with IO): ByteString @cps[IOSuspendable[Any]] = shift { cont: (ByteString IOSuspendable[Any])
ByteStringLength(cont, this, actor.self.currentMessage, len) ByteStringLength(cont, this, actor.context.currentMessage, len)
} }
def read()(implicit actor: Actor with IO): ByteString @cps[IOSuspendable[Any]] = shift { cont: (ByteString IOSuspendable[Any]) def read()(implicit actor: Actor with IO): ByteString @cps[IOSuspendable[Any]] = shift { cont: (ByteString IOSuspendable[Any])
ByteStringAny(cont, this, actor.self.currentMessage) ByteStringAny(cont, this, actor.context.currentMessage)
} }
def read(delimiter: ByteString, inclusive: Boolean = false)(implicit actor: Actor with IO): ByteString @cps[IOSuspendable[Any]] = shift { cont: (ByteString IOSuspendable[Any]) def read(delimiter: ByteString, inclusive: Boolean = false)(implicit actor: Actor with IO): ByteString @cps[IOSuspendable[Any]] = shift { cont: (ByteString IOSuspendable[Any])
ByteStringDelimited(cont, this, actor.self.currentMessage, delimiter, inclusive, 0) ByteStringDelimited(cont, this, actor.context.currentMessage, delimiter, inclusive, 0)
} }
} }
@ -176,7 +176,7 @@ trait IO {
} }
run() run()
case msg if _next ne Idle case msg if _next ne Idle
_messages enqueue self.currentMessage _messages enqueue context.currentMessage
case msg if _receiveIO.isDefinedAt(msg) case msg if _receiveIO.isDefinedAt(msg)
_next = reset { _receiveIO(msg); Idle } _next = reset { _receiveIO(msg); Idle }
run() run()
@ -211,7 +211,7 @@ trait IO {
private def run(): Unit = { private def run(): Unit = {
_next match { _next match {
case ByteStringLength(continuation, handle, message, waitingFor) case ByteStringLength(continuation, handle, message, waitingFor)
self.currentMessage = message context.currentMessage = message
val st = state(handle) val st = state(handle)
if (st.readBytes.length >= waitingFor) { if (st.readBytes.length >= waitingFor) {
val bytes = st.readBytes.take(waitingFor) //.compact val bytes = st.readBytes.take(waitingFor) //.compact
@ -220,7 +220,7 @@ trait IO {
run() run()
} }
case bsd @ ByteStringDelimited(continuation, handle, message, delimiter, inclusive, scanned) case bsd @ ByteStringDelimited(continuation, handle, message, delimiter, inclusive, scanned)
self.currentMessage = message context.currentMessage = message
val st = state(handle) val st = state(handle)
val idx = st.readBytes.indexOfSlice(delimiter, scanned) val idx = st.readBytes.indexOfSlice(delimiter, scanned)
if (idx >= 0) { if (idx >= 0) {
@ -233,7 +233,7 @@ trait IO {
_next = bsd.copy(scanned = math.min(idx - delimiter.length, 0)) _next = bsd.copy(scanned = math.min(idx - delimiter.length, 0))
} }
case ByteStringAny(continuation, handle, message) case ByteStringAny(continuation, handle, message)
self.currentMessage = message context.currentMessage = message
val st = state(handle) val st = state(handle)
if (st.readBytes.length > 0) { if (st.readBytes.length > 0) {
val bytes = st.readBytes //.compact val bytes = st.readBytes //.compact

View file

@ -59,7 +59,7 @@ object Props {
*/ */
def apply(creator: Creator[_ <: Actor]): Props = default.withCreator(creator.create) def apply(creator: Creator[_ <: Actor]): Props = default.withCreator(creator.create)
def apply(behavior: (ScalaActorRef with SelfActorRef) Actor.Receive): Props = apply(new Actor { def receive = behavior(self) }) def apply(behavior: ActorRef Actor.Receive): Props = apply(new Actor { def receive = behavior(self) })
} }
/** /**

View file

@ -167,7 +167,7 @@ sealed class Supervisor(handler: FaultHandlingStrategy, maxRestartsHandler: (Act
final class SupervisorActor private[akka] (maxRestartsHandler: (ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached) Unit) extends Actor { final class SupervisorActor private[akka] (maxRestartsHandler: (ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached) Unit) extends Actor {
override def postStop(): Unit = { override def postStop(): Unit = {
val i = self.linkedActors.iterator val i = linkedActors.iterator
while (i.hasNext) { while (i.hasNext) {
val ref = i.next val ref = i.next
ref.stop() ref.stop()

View file

@ -242,11 +242,11 @@ object TypedActor {
try { try {
if (m.isOneWay) m(me) if (m.isOneWay) m(me)
else if (m.returnsFuture_?) { else if (m.returnsFuture_?) {
self.channel match { channel match {
case p: ActorPromise p completeWith m(me).asInstanceOf[Future[Any]] case p: ActorPromise p completeWith m(me).asInstanceOf[Future[Any]]
case _ throw new IllegalStateException("Future-returning TypedActor didn't use ?/ask so cannot reply") case _ throw new IllegalStateException("Future-returning TypedActor didn't use ?/ask so cannot reply")
} }
} else self reply m(me) } else reply(m(me))
} finally { selfReference set null } } finally { selfReference set null }
} }

View file

@ -5,6 +5,8 @@
package akka.actor package akka.actor
import akka.japi.{ Creator, Procedure } import akka.japi.{ Creator, Procedure }
import akka.dispatch.{ MessageDispatcher, Promise }
import java.util.{ Collection JCollection }
/** /**
* Subclass this abstract class to create a MDB-style untyped actor. * Subclass this abstract class to create a MDB-style untyped actor.
@ -20,20 +22,20 @@ import akka.japi.{ Creator, Procedure }
* *
* if (msg.equals("UseReply")) { * if (msg.equals("UseReply")) {
* // Reply to original sender of message using the 'reply' method * // Reply to original sender of message using the 'reply' method
* getContext().reply(msg + ":" + getContext().getUuid()); * reply(msg + ":" + getSelf().getAddress());
* *
* } else if (msg.equals("UseSender") && getContext().getSender().isDefined()) { * } else if (msg.equals("UseSender") && getSender().isDefined()) {
* // Reply to original sender of message using the sender reference * // Reply to original sender of message using the sender reference
* // also passing along my own reference (the context) * // also passing along my own reference (the self)
* getContext().getSender().get().tell(msg, context); * getSender().get().tell(msg, getSelf());
* *
* } else if (msg.equals("UseSenderFuture") && getContext().getSenderFuture().isDefined()) { * } else if (msg.equals("UseSenderFuture") && getSenderFuture().isDefined()) {
* // Reply to original sender of message using the sender future reference * // Reply to original sender of message using the sender future reference
* getContext().getSenderFuture().get().completeWithResult(msg); * getSenderFuture().get().completeWithResult(msg);
* *
* } else if (msg.equals("SendToSelf")) { * } else if (msg.equals("SendToSelf")) {
* // Send message to the actor itself recursively * // Send message to the actor itself recursively
* getContext().tell(msg) * getSelf().tell(msg)
* *
* } else if (msg.equals("ForwardMessage")) { * } else if (msg.equals("ForwardMessage")) {
* // Retreive an actor from the ActorRegistry by ID and get an ActorRef back * // Retreive an actor from the ActorRegistry by ID and get an ActorRef back
@ -62,14 +64,49 @@ abstract class UntypedActor extends Actor {
def onReceive(message: Any): Unit def onReceive(message: Any): Unit
/** /**
* Returns the 'self' reference with the API. * Returns the 'self' reference.
*/ */
def getContext(): SelfActorRef = self def getSelf(): ActorRef = self
/** /**
* Returns the 'self' reference with the API. * The reference sender Actor of the last received message.
* Is defined if the message was sent from another Actor, else None.
*/ */
def context(): SelfActorRef = self def getSender: Option[ActorRef] = sender
/**
* The reference sender future of the last received message.
* Is defined if the message was sent with sent with '?'/'ask', else None.
*/
def getSenderFuture: Option[Promise[Any]] = senderFuture
/**
* Abstraction for unification of sender and senderFuture for later reply
*/
def getChannel: UntypedChannel = channel
/**
* Gets the current receive timeout
* When specified, the receive method should be able to handle a 'ReceiveTimeout' message.
*/
def getReceiveTimeout: Option[Long] = receiveTimeout
/**
* Defines the default timeout for an initial receive invocation.
* When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
*/
def setReceiveTimeout(timeout: Long): Unit = receiveTimeout = Some(timeout)
/**
* Returns an unmodifiable Java Collection containing the linked actors,
* please note that the backing map is thread-safe but not immutable
*/
def getLinkedActors: JCollection[ActorRef] = linkedActors
/**
* Returns the dispatcher (MessageDispatcher) that is used for this Actor
*/
def getDispatcher(): MessageDispatcher = dispatcher
/** /**
* Java API for become * Java API for become

View file

@ -6,7 +6,7 @@ package akka.dispatch
import akka.AkkaException import akka.AkkaException
import akka.event.EventHandler import akka.event.EventHandler
import akka.actor.{ Actor, ForwardableChannel, UntypedChannel, Scheduler, Timeout, ExceptionChannel } import akka.actor.{ Actor, UntypedChannel, Scheduler, Timeout, ExceptionChannel }
import scala.Option import scala.Option
import akka.japi.{ Procedure, Function JFunc, Option JOption } import akka.japi.{ Procedure, Function JFunc, Option JOption }
@ -933,7 +933,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
private def timeLeftNoinline(): Long = timeLeft() private def timeLeftNoinline(): Long = timeLeft()
} }
class ActorPromise(timeout: Timeout)(implicit dispatcher: MessageDispatcher) extends DefaultPromise[Any](timeout)(dispatcher) with ForwardableChannel with ExceptionChannel[Any] { class ActorPromise(timeout: Timeout)(implicit dispatcher: MessageDispatcher) extends DefaultPromise[Any](timeout)(dispatcher) with UntypedChannel with ExceptionChannel[Any] {
def !(message: Any)(implicit channel: UntypedChannel) = completeWithResult(message) def !(message: Any)(implicit channel: UntypedChannel) = completeWithResult(message)

View file

@ -72,8 +72,8 @@ abstract class MessageDispatcher extends Serializable {
/** /**
* Attaches the specified actor instance to this dispatcher * Attaches the specified actor instance to this dispatcher
*/ */
final def attach(actor: ActorInstance) { final def attach(actor: ActorInstance): Unit = {
var promise = new ActorPromise(Timeout.never)(this) val promise = new ActorPromise(Timeout.never)(this)
guard.lock.lock() guard.lock.lock()
try { try {
register(actor) register(actor)
@ -87,7 +87,7 @@ abstract class MessageDispatcher extends Serializable {
/** /**
* Detaches the specified actor instance from this dispatcher * Detaches the specified actor instance from this dispatcher
*/ */
final def detach(actor: ActorInstance) { final def detach(actor: ActorInstance): Unit = {
guard withGuard { guard withGuard {
unregister(actor) unregister(actor)
} }

View file

@ -98,7 +98,7 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒
protected def _route(): Receive = { protected def _route(): Receive = {
// for testing... // for testing...
case Stat case Stat
self tryReply Stats(_delegates length) tryReply(Stats(_delegates length))
case MaximumNumberOfRestartsWithinTimeRangeReached(victim, _, _, _) case MaximumNumberOfRestartsWithinTimeRangeReached(victim, _, _, _)
_delegates = _delegates filterNot { _.uuid == victim.uuid } _delegates = _delegates filterNot { _.uuid == victim.uuid }
case Death(victim, _, _) case Death(victim, _, _)
@ -152,7 +152,7 @@ trait SmallestMailboxSelector {
var take = if (partialFill) math.min(selectionCount, delegates.length) else selectionCount var take = if (partialFill) math.min(selectionCount, delegates.length) else selectionCount
def mailboxSize(a: ActorRef): Int = a match { def mailboxSize(a: ActorRef): Int = a match {
case l: LocalActorRef l.dispatcher.mailboxSize(l.underlying) case l: LocalActorRef l.underlying.dispatcher.mailboxSize(l.underlying)
case _ Int.MaxValue //Non-local actors mailbox size is unknown, so consider them lowest priority case _ Int.MaxValue //Non-local actors mailbox size is unknown, so consider them lowest priority
} }
@ -238,7 +238,7 @@ trait MailboxPressureCapacitor {
def pressureThreshold: Int def pressureThreshold: Int
def pressure(delegates: Seq[ActorRef]): Int = def pressure(delegates: Seq[ActorRef]): Int =
delegates count { delegates count {
case a: LocalActorRef a.dispatcher.mailboxSize(a.underlying) > pressureThreshold case a: LocalActorRef a.underlying.dispatcher.mailboxSize(a.underlying) > pressureThreshold
case _ false case _ false
} }
} }
@ -249,8 +249,8 @@ trait MailboxPressureCapacitor {
trait ActiveFuturesPressureCapacitor { trait ActiveFuturesPressureCapacitor {
def pressure(delegates: Seq[ActorRef]): Int = def pressure(delegates: Seq[ActorRef]): Int =
delegates count { delegates count {
case fc: ForwardableChannel fc.channel.isInstanceOf[Promise[_]] case a: LocalActorRef a.underlying.channel.isInstanceOf[Promise[_]]
case _ false case _ false
} }
} }

View file

@ -24,7 +24,7 @@ object TypedCamelTestSupport {
def countdown: Handler = { def countdown: Handler = {
case SetExpectedMessageCount(num) { case SetExpectedMessageCount(num) {
latch = new CountDownLatch(num) latch = new CountDownLatch(num)
self.reply(latch) reply(latch)
} }
case msg latch.countDown case msg latch.countDown
} }
@ -32,7 +32,7 @@ object TypedCamelTestSupport {
trait Respond { this: Actor trait Respond { this: Actor
def respond: Handler = { def respond: Handler = {
case msg: Message self.reply(response(msg)) case msg: Message reply(response(msg))
} }
def response(msg: Message): Any = "Hello %s" format msg.body def response(msg: Message): Any = "Hello %s" format msg.body
@ -42,8 +42,8 @@ object TypedCamelTestSupport {
val messages = Buffer[Any]() val messages = Buffer[Any]()
def retain: Handler = { def retain: Handler = {
case GetRetainedMessage self.reply(messages.last) case GetRetainedMessage reply(messages.last)
case GetRetainedMessages(p) self.reply(messages.toList.filter(p)) case GetRetainedMessages(p) reply(messages.toList.filter(p))
case msg { case msg {
messages += msg messages += msg
msg msg

View file

@ -127,11 +127,11 @@ private[camel] class ActivationTracker extends Actor {
def receive = { def receive = {
case SetExpectedActivationCount(num) { case SetExpectedActivationCount(num) {
activationLatch = new CountDownLatch(num) activationLatch = new CountDownLatch(num)
self.reply(activationLatch) reply(activationLatch)
} }
case SetExpectedDeactivationCount(num) { case SetExpectedDeactivationCount(num) {
deactivationLatch = new CountDownLatch(num) deactivationLatch = new CountDownLatch(num)
self.reply(deactivationLatch) reply(deactivationLatch)
} }
case EndpointActivated activationLatch.countDown case EndpointActivated activationLatch.countDown
case EndpointDeactivated deactivationLatch.countDown case EndpointDeactivated deactivationLatch.countDown

View file

@ -99,7 +99,7 @@ trait ProducerSupport { this: Actor ⇒
val producer = self val producer = self
// Need copies of channel reference here since the callback could be done // Need copies of channel reference here since the callback could be done
// later by another thread. // later by another thread.
val channel = self.channel val replyChannel = channel
def done(doneSync: Boolean): Unit = { def done(doneSync: Boolean): Unit = {
(doneSync, exchange.isFailed) match { (doneSync, exchange.isFailed) match {
@ -114,11 +114,11 @@ trait ProducerSupport { this: Actor ⇒
receiveAfterProduce(result) receiveAfterProduce(result)
private def dispatchAsync(result: Any) = { private def dispatchAsync(result: Any) = {
channel match { replyChannel match {
case _: ActorPromise case _: ActorPromise
producer.postMessageToMailboxAndCreateFutureResultWithTimeout(result, producer.timeout, channel) producer.postMessageToMailboxAndCreateFutureResultWithTimeout(result, producer.timeout, replyChannel)
case _ case _
producer.postMessageToMailbox(result, channel) producer.postMessageToMailbox(result, replyChannel)
} }
} }
}) })
@ -159,7 +159,7 @@ trait ProducerSupport { this: Actor ⇒
* actor). * actor).
*/ */
protected def receiveAfterProduce: Receive = { protected def receiveAfterProduce: Receive = {
case msg if (!oneway) self.reply(msg) case msg if (!oneway) reply(msg)
} }
/** /**

View file

@ -15,7 +15,7 @@ public class SampleUntypedConsumer extends UntypedConsumerActor {
Message msg = (Message)message; Message msg = (Message)message;
String body = msg.getBodyAs(String.class); String body = msg.getBodyAs(String.class);
String header = msg.getHeaderAs("test", String.class); String header = msg.getHeaderAs("test", String.class);
getContext().tryReply(String.format("%s %s", body, header)); tryReply(String.format("%s %s", body, header));
} }
} }

View file

@ -17,7 +17,7 @@ public class SampleUntypedConsumerBlocking extends UntypedConsumerActor {
Message msg = (Message)message; Message msg = (Message)message;
String body = msg.getBodyAs(String.class); String body = msg.getBodyAs(String.class);
String header = msg.getHeaderAs("test", String.class); String header = msg.getHeaderAs("test", String.class);
getContext().tryReply(String.format("%s %s", body, header)); tryReply(String.format("%s %s", body, header));
} }
} }

View file

@ -36,7 +36,7 @@ object CamelTestSupport {
def countdown: Handler = { def countdown: Handler = {
case SetExpectedMessageCount(num) { case SetExpectedMessageCount(num) {
latch = new CountDownLatch(num) latch = new CountDownLatch(num)
self.reply(latch) reply(latch)
} }
case msg latch.countDown case msg latch.countDown
} }
@ -44,7 +44,7 @@ object CamelTestSupport {
trait Respond { this: Actor trait Respond { this: Actor
def respond: Handler = { def respond: Handler = {
case msg: Message self.reply(response(msg)) case msg: Message reply(response(msg))
} }
def response(msg: Message): Any = "Hello %s" format msg.body def response(msg: Message): Any = "Hello %s" format msg.body
@ -54,8 +54,8 @@ object CamelTestSupport {
val messages = Buffer[Any]() val messages = Buffer[Any]()
def retain: Handler = { def retain: Handler = {
case GetRetainedMessage self.reply(messages.last) case GetRetainedMessage reply(messages.last)
case GetRetainedMessages(p) self.reply(messages.toList.filter(p)) case GetRetainedMessages(p) reply(messages.toList.filter(p))
case msg { case msg {
messages += msg messages += msg
msg msg

View file

@ -212,7 +212,7 @@ object ConsumerScalaTest {
class TestConsumer(uri: String) extends Actor with Consumer { class TestConsumer(uri: String) extends Actor with Consumer {
def endpointUri = uri def endpointUri = uri
protected def receive = { protected def receive = {
case msg: Message self.reply("received %s" format msg.body) case msg: Message reply("received %s" format msg.body)
} }
} }
@ -227,7 +227,7 @@ object ConsumerScalaTest {
def endpointUri = uri def endpointUri = uri
override def autoack = false override def autoack = false
protected def receive = { protected def receive = {
case msg: Message self.reply(Ack) case msg: Message reply(Ack)
} }
} }
@ -248,15 +248,15 @@ object ConsumerScalaTest {
protected def receive = { protected def receive = {
case "fail" { throw new Exception("test") } case "fail" { throw new Exception("test") }
case "succeed" self.reply("ok") case "succeed" reply("ok")
} }
override def preRestart(reason: scala.Throwable, msg: Option[Any]) { override def preRestart(reason: scala.Throwable, msg: Option[Any]) {
self.tryReply("pr") tryReply("pr")
} }
override def postStop { override def postStop {
self.tryReply("ps") tryReply("ps")
} }
} }
@ -289,7 +289,7 @@ object ConsumerScalaTest {
} }
private def respondTo(msg: Message) = private def respondTo(msg: Message) =
if (valid) self.reply("accepted: %s" format msg.body) if (valid) reply("accepted: %s" format msg.body)
else throw new Exception("rejected: %s" format msg.body) else throw new Exception("rejected: %s" format msg.body)
} }

View file

@ -253,8 +253,8 @@ object ProducerFeatureTest {
class TestResponder extends Actor { class TestResponder extends Actor {
protected def receive = { protected def receive = {
case msg: Message msg.body match { case msg: Message msg.body match {
case "fail" self.reply(Failure(new Exception("failure"), msg.headers)) case "fail" reply(Failure(new Exception("failure"), msg.headers))
case _ self.reply(msg.transformBody { body: String "received %s" format body }) case _ reply(msg.transformBody { body: String "received %s" format body })
} }
} }
} }
@ -262,9 +262,9 @@ object ProducerFeatureTest {
class ReplyingForwardTarget extends Actor { class ReplyingForwardTarget extends Actor {
protected def receive = { protected def receive = {
case msg: Message case msg: Message
self.reply(msg.addHeader("test" -> "result")) reply(msg.addHeader("test" -> "result"))
case msg: Failure case msg: Failure
self.reply(Failure(msg.cause, msg.headers + ("test" -> "failure"))) reply(Failure(msg.cause, msg.headers + ("test" -> "failure")))
} }
} }

View file

@ -96,13 +96,13 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with
object ActorComponentFeatureTest { object ActorComponentFeatureTest {
class CustomIdActor extends Actor { class CustomIdActor extends Actor {
protected def receive = { protected def receive = {
case msg: Message self.reply("Received %s" format msg.body) case msg: Message reply("Received %s" format msg.body)
} }
} }
class FailWithMessage extends Actor { class FailWithMessage extends Actor {
protected def receive = { protected def receive = {
case msg: Message self.reply(Failure(new Exception("test"))) case msg: Message reply(Failure(new Exception("test")))
} }
} }

View file

@ -25,7 +25,7 @@ object RegistryStoreMultiJvmSpec {
class HelloWorld1 extends Actor with Serializable { class HelloWorld1 extends Actor with Serializable {
def receive = { def receive = {
case "Hello" case "Hello"
self.reply("World from node [" + Config.nodename + "]") reply("World from node [" + Config.nodename + "]")
} }
} }
@ -36,7 +36,7 @@ object RegistryStoreMultiJvmSpec {
Thread.sleep(1000) Thread.sleep(1000)
counter += 1 counter += 1
case "Count" case "Count"
self.reply(counter) reply(counter)
} }
} }
} }

View file

@ -25,7 +25,7 @@
* class HelloWorld extends Actor with Serializable { * class HelloWorld extends Actor with Serializable {
* def receive = { * def receive = {
* case "Hello" * case "Hello"
* self.reply("World from node [" + Config.nodename + "]") * reply("World from node [" + Config.nodename + "]")
* } * }
* } * }
* } * }

View file

@ -23,9 +23,9 @@
// def receive = { // def receive = {
// case Count(nr) // case Count(nr)
// log += nr.toString // log += nr.toString
// self.reply("World from node [" + Config.nodename + "]") // reply("World from node [" + Config.nodename + "]")
// case GetLog // case GetLog
// self.reply(Log(log)) // reply(Log(log))
// } // }
// } // }
// } // }

View file

@ -25,9 +25,9 @@
// case Count(nr) // case Count(nr)
// log += nr.toString // log += nr.toString
// //println("Message to HelloWorld log =======> " + log) // //println("Message to HelloWorld log =======> " + log)
// self.reply("World from node [" + Config.nodename + "]") // reply("World from node [" + Config.nodename + "]")
// case GetLog // case GetLog
// self.reply(Log(log)) // reply(Log(log))
// } // }
// } // }
// } // }

View file

@ -22,12 +22,12 @@
// var log = "" // var log = ""
// def receive = { // def receive = {
// case Count(nr) // case Count(nr)
// println("Received number: " + nr + " on " + self.uuid) // println("Received number: " + nr + " on " + self.address)
// log += nr.toString // log += nr.toString
// self.reply("World from node [" + Config.nodename + "]") // reply("World from node [" + Config.nodename + "]")
// case GetLog // case GetLog
// println("Received getLog on " + self.uuid) // println("Received getLog on " + uuid)
// self.reply(Log(log)) // reply(Log(log))
// } // }
// } // }
// } // }

View file

@ -23,9 +23,9 @@
// def receive = { // def receive = {
// case Count(nr) // case Count(nr)
// log += nr.toString // log += nr.toString
// self.reply("World from node [" + Config.nodename + "]") // reply("World from node [" + Config.nodename + "]")
// case GetLog // case GetLog
// self.reply(Log(log)) // reply(Log(log))
// } // }
// } // }
// } // }

View file

@ -20,7 +20,7 @@ object DirectRoutingFailoverMultiJvmSpec {
def receive = { def receive = {
case "identify" case "identify"
self.reply(Config.nodename) reply(Config.nodename)
} }
} }
} }

View file

@ -13,7 +13,7 @@ object HomeNodeMultiJvmSpec {
class SomeActor extends Actor with Serializable { class SomeActor extends Actor with Serializable {
def receive = { def receive = {
case "identify" { case "identify" {
self.reply(Config.nodename) reply(Config.nodename)
} }
} }
} }

View file

@ -16,7 +16,7 @@ object SingleReplicaDirectRoutingMultiJvmSpec {
def receive = { def receive = {
case "identify" { case "identify" {
//println("The node received the 'identify' command: " + Config.nodename) //println("The node received the 'identify' command: " + Config.nodename)
self.reply(Config.nodename) reply(Config.nodename)
} }
} }
} }

View file

@ -20,7 +20,7 @@ object RandomFailoverMultiJvmSpec {
def receive = { def receive = {
case "identify" case "identify"
self.reply(Config.nodename) reply(Config.nodename)
} }
} }

View file

@ -13,7 +13,7 @@ object HomeNodeMultiJvmSpec {
class SomeActor extends Actor with Serializable { class SomeActor extends Actor with Serializable {
def receive = { def receive = {
case "identify" { case "identify" {
self.reply(Config.nodename) reply(Config.nodename)
} }
} }
} }

View file

@ -19,7 +19,7 @@ object Random1ReplicaMultiJvmSpec {
class HelloWorld extends Actor with Serializable { class HelloWorld extends Actor with Serializable {
def receive = { def receive = {
case "Hello" case "Hello"
self.reply("World from node [" + Config.nodename + "]") reply("World from node [" + Config.nodename + "]")
} }
} }

View file

@ -20,7 +20,7 @@ object Random3ReplicasMultiJvmSpec {
class HelloWorld extends Actor with Serializable { class HelloWorld extends Actor with Serializable {
def receive = { def receive = {
case "Hello" case "Hello"
self.reply("World from node [" + Config.nodename + "]") reply("World from node [" + Config.nodename + "]")
} }
} }
} }

View file

@ -21,7 +21,7 @@ object RoundRobinFailoverMultiJvmSpec {
def receive = { def receive = {
case "identify" case "identify"
self.reply(Config.nodename) reply(Config.nodename)
} }
} }

View file

@ -13,7 +13,7 @@ object HomeNodeMultiJvmSpec {
class SomeActor extends Actor with Serializable { class SomeActor extends Actor with Serializable {
def receive = { def receive = {
case "identify" { case "identify" {
self.reply(Config.nodename) reply(Config.nodename)
} }
} }
} }

View file

@ -18,7 +18,7 @@ object RoundRobin1ReplicaMultiJvmSpec {
class HelloWorld extends Actor with Serializable { class HelloWorld extends Actor with Serializable {
def receive = { def receive = {
case "Hello" case "Hello"
self.reply("World from node [" + Config.nodename + "]") reply("World from node [" + Config.nodename + "]")
} }
} }

View file

@ -31,7 +31,7 @@ object RoundRobin2ReplicasMultiJvmSpec {
class HelloWorld extends Actor with Serializable { class HelloWorld extends Actor with Serializable {
def receive = { def receive = {
case "Hello" case "Hello"
self.reply("World from node [" + Config.nodename + "]") reply("World from node [" + Config.nodename + "]")
} }
} }
} }

View file

@ -27,7 +27,7 @@
// class HelloWorld extends Actor with Serializable { // class HelloWorld extends Actor with Serializable {
// def receive = { // def receive = {
// case "Hello" // case "Hello"
// self.reply("World from node [" + Config.nodename + "]") // reply("World from node [" + Config.nodename + "]")
// } // }
// } // }
// } // }

View file

@ -32,11 +32,11 @@ object ScatterGatherFailoverMultiJvmSpec {
case Shutdown(None) shutdownNode case Shutdown(None) shutdownNode
case Sleep(node) if node.equals(Config.nodename) case Sleep(node) if node.equals(Config.nodename)
Thread sleep 100 Thread sleep 100
self.reply(Config.nodename) reply(Config.nodename)
case Shutdown(Some(node)) if node.equals(Config.nodename) shutdownNode case Shutdown(Some(node)) if node.equals(Config.nodename) shutdownNode
case _ case _
Thread sleep 100 Thread sleep 100
self.reply(Config.nodename) reply(Config.nodename)
} }
} }

View file

@ -58,7 +58,7 @@ object PingPongMultiJvmExample {
def receive = { def receive = {
case Ping case Ping
println("---->> PONG") println("---->> PONG")
self reply Pong reply(Pong)
} }
} }
} }

View file

@ -39,9 +39,9 @@ object PingPong {
if (count < NrOfPings) { if (count < NrOfPings) {
println("---->> PING (%s)" format count) println("---->> PING (%s)" format count)
count += 1 count += 1
self reply Ball reply(Ball)
} else { } else {
self.sender.foreach(s (s ? Stop).await) sender.foreach(s (s ? Stop).await)
gameOverLatch.countDown gameOverLatch.countDown
self.stop self.stop
} }
@ -53,9 +53,9 @@ object PingPong {
class PongActor extends Actor with Serializable { class PongActor extends Actor with Serializable {
def receive = { def receive = {
case Ball case Ball
self reply Ball reply(Ball)
case Stop case Stop
self reply Stop reply(Stop)
self.stop self.stop
} }
} }

View file

@ -43,7 +43,7 @@ object Pi extends App {
def receive = { def receive = {
case Work(start, nrOfElements) => case Work(start, nrOfElements) =>
self reply Result(calculatePiFor(start, nrOfElements)) // perform the work reply(Result(calculatePiFor(start, nrOfElements))) // perform the work
} }
} }
//#worker //#worker

View file

@ -15,7 +15,7 @@ import akka.dispatch.MessageDispatcher
object DurableMailboxSpecActorFactory { object DurableMailboxSpecActorFactory {
class MailboxTestActor extends Actor { class MailboxTestActor extends Actor {
def receive = { case "sum" self.reply("sum") } def receive = { case "sum" reply("sum") }
} }
def createMailboxTestActor(id: String)(implicit dispatcher: MessageDispatcher): ActorRef = def createMailboxTestActor(id: String)(implicit dispatcher: MessageDispatcher): ActorRef =

View file

@ -26,9 +26,8 @@ class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb", MongoNaiveDura
/*object DurableMongoMailboxSpecActorFactory { /*object DurableMongoMailboxSpecActorFactory {
class MongoMailboxTestActor extends Actor { class MongoMailboxTestActor extends Actor {
self.lifeCycle = Temporary
def receive = { def receive = {
case "sum" => self.reply("sum") case "sum" => reply("sum")
} }
} }

View file

@ -258,7 +258,7 @@ trait Endpoint { this: Actor ⇒
if (!endpoints.isEmpty) endpoints.foreach { _.apply(uri) ! req } if (!endpoints.isEmpty) endpoints.foreach { _.apply(uri) ! req }
else { else {
self.channel match { channel match {
case null | NullChannel _na(uri, req) case null | NullChannel _na(uri, req)
case channel channel ! NoneAvailable(uri, req) case channel channel ! NoneAvailable(uri, req)
} }

View file

@ -129,13 +129,13 @@ object ActorSerialization {
} }
} }
l.receiveTimeout.foreach(builder.setReceiveTimeout(_)) l.underlying.receiveTimeout.foreach(builder.setReceiveTimeout(_))
val actorInstance = l.underlyingActorInstance val actorInstance = l.underlyingActorInstance
Serialization.serialize(actorInstance.asInstanceOf[T]) match { Serialization.serialize(actorInstance.asInstanceOf[T]) match {
case Right(bytes) builder.setActorInstance(ByteString.copyFrom(bytes)) case Right(bytes) builder.setActorInstance(ByteString.copyFrom(bytes))
case Left(exception) throw new Exception("Error serializing : " + actorInstance.getClass.getName) case Left(exception) throw new Exception("Error serializing : " + actorInstance.getClass.getName)
} }
val stack = l.hotswap val stack = l.underlying.hotswap
if (!stack.isEmpty) if (!stack.isEmpty)
builder.setHotswapStack(ByteString.copyFrom(akka.serialization.JavaSerializer.toBinary(stack))) builder.setHotswapStack(ByteString.copyFrom(akka.serialization.JavaSerializer.toBinary(stack)))
} }

View file

@ -30,7 +30,7 @@ class ActorSerializeSpec extends WordSpec with ShouldMatchers with BeforeAndAfte
val actor2 = fromBinary(bytes).asInstanceOf[LocalActorRef] val actor2 = fromBinary(bytes).asInstanceOf[LocalActorRef]
(actor2 ? "hello").get should equal("world 3") (actor2 ? "hello").get should equal("world 3")
actor2.receiveTimeout should equal(Some(1000)) actor2.underlying.receiveTimeout should equal(Some(1000))
actor1.stop() actor1.stop()
actor2.stop() actor2.stop()
} }
@ -40,15 +40,15 @@ class ActorSerializeSpec extends WordSpec with ShouldMatchers with BeforeAndAfte
val actor1 = new LocalActorRef(Props[MyStatelessActorWithMessagesInMailbox], newUuid.toString, systemService = true) val actor1 = new LocalActorRef(Props[MyStatelessActorWithMessagesInMailbox], newUuid.toString, systemService = true)
for (i 1 to 10) actor1 ! "hello" for (i 1 to 10) actor1 ! "hello"
actor1.getDispatcher.mailboxSize(actor1.underlying) should be > (0) actor1.underlying.dispatcher.mailboxSize(actor1.underlying) should be > (0)
val actor2 = fromBinary(toBinary(actor1)).asInstanceOf[LocalActorRef] val actor2 = fromBinary(toBinary(actor1)).asInstanceOf[LocalActorRef]
Thread.sleep(1000) Thread.sleep(1000)
actor2.getDispatcher.mailboxSize(actor1.underlying) should be > (0) actor2.underlying.dispatcher.mailboxSize(actor1.underlying) should be > (0)
(actor2 ? "hello-reply").get should equal("world") (actor2 ? "hello-reply").get should equal("world")
val actor3 = fromBinary(toBinary(actor1, false)).asInstanceOf[LocalActorRef] val actor3 = fromBinary(toBinary(actor1, false)).asInstanceOf[LocalActorRef]
Thread.sleep(1000) Thread.sleep(1000)
actor3.getDispatcher.mailboxSize(actor1.underlying) should equal(0) actor3.underlying.dispatcher.mailboxSize(actor1.underlying) should equal(0)
(actor3 ? "hello-reply").get should equal("world") (actor3 ? "hello-reply").get should equal("world")
} }
@ -66,15 +66,15 @@ class ActorSerializeSpec extends WordSpec with ShouldMatchers with BeforeAndAfte
(actor1 ! p1) (actor1 ! p1)
(actor1 ! p1) (actor1 ! p1)
(actor1 ! p1) (actor1 ! p1)
actor1.getDispatcher.mailboxSize(actor1.underlying) should be > (0) actor1.underlying.dispatcher.mailboxSize(actor1.underlying) should be > (0)
val actor2 = fromBinary(toBinary(actor1)).asInstanceOf[LocalActorRef] val actor2 = fromBinary(toBinary(actor1)).asInstanceOf[LocalActorRef]
Thread.sleep(1000) Thread.sleep(1000)
actor2.getDispatcher.mailboxSize(actor1.underlying) should be > (0) actor2.underlying.dispatcher.mailboxSize(actor1.underlying) should be > (0)
(actor2 ? "hello-reply").get should equal("hello") (actor2 ? "hello-reply").get should equal("hello")
val actor3 = fromBinary(toBinary(actor1, false)).asInstanceOf[LocalActorRef] val actor3 = fromBinary(toBinary(actor1, false)).asInstanceOf[LocalActorRef]
Thread.sleep(1000) Thread.sleep(1000)
actor3.getDispatcher.mailboxSize(actor1.underlying) should equal(0) actor3.underlying.dispatcher.mailboxSize(actor1.underlying) should equal(0)
(actor3 ? "hello-reply").get should equal("hello") (actor3 ? "hello-reply").get should equal("hello")
} }
} }
@ -103,15 +103,15 @@ class ActorSerializeSpec extends WordSpec with ShouldMatchers with BeforeAndAfte
val msg = MyMessage(123, "debasish ghosh", true) val msg = MyMessage(123, "debasish ghosh", true)
val b = ProtobufProtocol.MyMessage.newBuilder.setId(msg.id).setName(msg.name).setStatus(msg.status).build val b = ProtobufProtocol.MyMessage.newBuilder.setId(msg.id).setName(msg.name).setStatus(msg.status).build
for (i 1 to 10) actor1 ! b for (i 1 to 10) actor1 ! b
actor1.getDispatcher.mailboxSize(actor1.underlying) should be > (0) actor1.underlying.dispatcher.mailboxSize(actor1.underlying) should be > (0)
val actor2 = fromBinary(toBinary(actor1)).asInstanceOf[LocalActorRef] val actor2 = fromBinary(toBinary(actor1)).asInstanceOf[LocalActorRef]
Thread.sleep(1000) Thread.sleep(1000)
actor2.getDispatcher.mailboxSize(actor1.underlying) should be > (0) actor2.underlying.dispatcher.mailboxSize(actor1.underlying) should be > (0)
(actor2 ? "hello-reply").get should equal("world") (actor2 ? "hello-reply").get should equal("world")
val actor3 = fromBinary(toBinary(actor1, false)).asInstanceOf[LocalActorRef] val actor3 = fromBinary(toBinary(actor1, false)).asInstanceOf[LocalActorRef]
Thread.sleep(1000) Thread.sleep(1000)
actor3.getDispatcher.mailboxSize(actor1.underlying) should equal(0) actor3.underlying.dispatcher.mailboxSize(actor1.underlying) should equal(0)
(actor3 ? "hello-reply").get should equal("world") (actor3 ? "hello-reply").get should equal("world")
} }
} }
@ -119,12 +119,12 @@ class ActorSerializeSpec extends WordSpec with ShouldMatchers with BeforeAndAfte
class MyJavaSerializableActor extends Actor with scala.Serializable { class MyJavaSerializableActor extends Actor with scala.Serializable {
var count = 0 var count = 0
self.receiveTimeout = Some(1000) receiveTimeout = Some(1000)
def receive = { def receive = {
case "hello" case "hello"
count = count + 1 count = count + 1
self.reply("world " + count) reply("world " + count)
} }
} }
@ -132,7 +132,7 @@ class MyStatelessActorWithMessagesInMailbox extends Actor with scala.Serializabl
def receive = { def receive = {
case "hello" case "hello"
Thread.sleep(500) Thread.sleep(500)
case "hello-reply" self.reply("world") case "hello-reply" reply("world")
} }
} }
@ -140,7 +140,7 @@ class MyActorWithProtobufMessagesInMailbox extends Actor with scala.Serializable
def receive = { def receive = {
case m: Message case m: Message
Thread.sleep(500) Thread.sleep(500)
case "hello-reply" self.reply("world") case "hello-reply" reply("world")
} }
} }
@ -148,6 +148,6 @@ class PersonActorWithMessagesInMailbox extends Actor with scala.Serializable {
def receive = { def receive = {
case p: Person case p: Person
Thread.sleep(500) Thread.sleep(500)
case "hello-reply" self.reply("hello") case "hello-reply" reply("hello")
} }
} }

View file

@ -15,6 +15,6 @@ public class UntypedConsumer1 extends UntypedConsumerActor {
public void onReceive(Object message) { public void onReceive(Object message) {
Message msg = (Message)message; Message msg = (Message)message;
String body = msg.getBodyAs(String.class); String body = msg.getBodyAs(String.class);
getContext().tryReply(String.format("received %s", body)); tryReply(String.format("received %s", body));
} }
} }

View file

@ -12,7 +12,7 @@ class RemoteActor1 extends Actor with Consumer {
def endpointUri = "jetty:http://localhost:6644/camel/remote-actor-1" def endpointUri = "jetty:http://localhost:6644/camel/remote-actor-1"
protected def receive = { protected def receive = {
case msg: Message self.reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote1"))) case msg: Message reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote1")))
} }
} }
@ -23,7 +23,7 @@ class RemoteActor2 extends Actor with Consumer {
def endpointUri = "jetty:http://localhost:6644/camel/remote-actor-2" def endpointUri = "jetty:http://localhost:6644/camel/remote-actor-2"
protected def receive = { protected def receive = {
case msg: Message self.reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote2"))) case msg: Message reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote2")))
} }
} }
@ -44,7 +44,7 @@ class Consumer2 extends Actor with Consumer {
def endpointUri = "jetty:http://0.0.0.0:8877/camel/default" def endpointUri = "jetty:http://0.0.0.0:8877/camel/default"
def receive = { def receive = {
case msg: Message self.reply("Hello %s" format msg.bodyAs[String]) case msg: Message reply("Hello %s" format msg.bodyAs[String])
} }
} }
@ -62,10 +62,10 @@ class Consumer4 extends Actor with Consumer {
def receive = { def receive = {
case msg: Message msg.bodyAs[String] match { case msg: Message msg.bodyAs[String] match {
case "stop" { case "stop" {
self.reply("Consumer4 stopped") reply("Consumer4 stopped")
self.stop self.stop
} }
case body self.reply(body) case body reply(body)
} }
} }
} }
@ -76,7 +76,7 @@ class Consumer5 extends Actor with Consumer {
def receive = { def receive = {
case _ { case _ {
Actor.actorOf[Consumer4] Actor.actorOf[Consumer4]
self.reply("Consumer4 started") reply("Consumer4 started")
} }
} }
} }
@ -106,7 +106,7 @@ class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consu
protected def receive = { protected def receive = {
case msg: Message { case msg: Message {
publisher ! msg.bodyAs[String] publisher ! msg.bodyAs[String]
self.reply("message published") reply("message published")
} }
} }
} }
@ -135,8 +135,8 @@ class HttpProducer(transformer: ActorRef) extends Actor with Producer {
class HttpTransformer extends Actor { class HttpTransformer extends Actor {
protected def receive = { protected def receive = {
case msg: Message self.reply(msg.transformBody { body: String body replaceAll ("Akka ", "AKKA ") }) case msg: Message reply(msg.transformBody { body: String body replaceAll ("Akka ", "AKKA ") })
case msg: Failure self.reply(msg) case msg: Failure reply(msg)
} }
} }
@ -150,11 +150,11 @@ class FileConsumer extends Actor with Consumer {
case msg: Message { case msg: Message {
if (counter == 2) { if (counter == 2) {
println("received %s" format msg.bodyAs[String]) println("received %s" format msg.bodyAs[String])
self.reply(Ack) reply(Ack)
} else { } else {
println("rejected %s" format msg.bodyAs[String]) println("rejected %s" format msg.bodyAs[String])
counter += 1 counter += 1
self.reply(Failure(new Exception("message number %s not accepted" format counter))) reply(Failure(new Exception("message number %s not accepted" format counter)))
} }
} }
} }

View file

@ -15,7 +15,7 @@ public class SampleRemoteUntypedConsumer extends UntypedConsumerActor {
Message msg = (Message)message; Message msg = (Message)message;
String body = msg.getBodyAs(String.class); String body = msg.getBodyAs(String.class);
String header = msg.getHeaderAs("test", String.class); String header = msg.getHeaderAs("test", String.class);
getContext().tryReply(String.format("%s %s", body, header)); tryReply(String.format("%s %s", body, header));
} }
} }

View file

@ -66,7 +66,7 @@ object HttpConcurrencyTestStress {
var correlationIds = Set[Any]() var correlationIds = Set[Any]()
override protected def receive = { override protected def receive = {
case "getCorrelationIdCount" self.reply(correlationIds.size) case "getCorrelationIdCount" reply(correlationIds.size)
case msg super.receive(msg) case msg super.receive(msg)
} }
@ -93,7 +93,7 @@ object HttpConcurrencyTestStress {
class HttpServerWorker extends Actor { class HttpServerWorker extends Actor {
protected def receive = { protected def receive = {
case msg self.reply(msg) case msg reply(msg)
} }
} }
} }

View file

@ -94,8 +94,8 @@ object RemoteConsumerTest {
def endpointUri = "direct:remote-consumer" def endpointUri = "direct:remote-consumer"
protected def receive = { protected def receive = {
case "init" self.reply("done") case "init" reply("done")
case m: Message self.reply("remote actor: %s" format m.body) case m: Message reply("remote actor: %s" format m.body)
} }
} }
} }

View file

@ -92,8 +92,6 @@
* Memory-backed chat storage implementation. * Memory-backed chat storage implementation.
*/ */
class MemoryChatStorage extends ChatStorage { class MemoryChatStorage extends ChatStorage {
self.lifeCycle = Permanent
private var chatLog = TransactionalVector[Array[Byte]]() private var chatLog = TransactionalVector[Array[Byte]]()
EventHandler.info(this, "Memory-based chat storage is starting up...") EventHandler.info(this, "Memory-based chat storage is starting up...")
@ -105,7 +103,7 @@
case GetChatLog(_) => case GetChatLog(_) =>
val messageList = atomic { chatLog.map(bytes => new String(bytes, "UTF-8")).toList } val messageList = atomic { chatLog.map(bytes => new String(bytes, "UTF-8")).toList }
self.reply(ChatLog(messageList)) reply(ChatLog(messageList))
} }
override def postRestart(reason: Throwable) { override def postRestart(reason: Throwable) {
@ -176,7 +174,7 @@
* Chat server. Manages sessions and redirects all other messages to the Session for the client. * Chat server. Manages sessions and redirects all other messages to the Session for the client.
*/ */
trait ChatServer extends Actor { trait ChatServer extends Actor {
self.faultHandler = OneForOnePermanentStrategy(List(classOf[Exception]),5, 5000) //faultHandler = OneForOnePermanentStrategy(List(classOf[Exception]),5, 5000)
val storage: ActorRef val storage: ActorRef
EventHandler.info(this, "Chat server is starting up...") EventHandler.info(this, "Chat server is starting up...")

View file

@ -38,7 +38,7 @@ class Chopstick(name: String) extends Actor with FSM[ChopstickState, TakenBy] {
// When a chopstick is available, it can be taken by a some hakker // When a chopstick is available, it can be taken by a some hakker
when(Available) { when(Available) {
case Event(Take, _) case Event(Take, _)
goto(Taken) using TakenBy(self.channel) replying Taken(self) goto(Taken) using TakenBy(channel) replying Taken(self)
} }
// When a chopstick is taken by a hakker // When a chopstick is taken by a hakker
@ -47,7 +47,7 @@ class Chopstick(name: String) extends Actor with FSM[ChopstickState, TakenBy] {
when(Taken) { when(Taken) {
case Event(Take, currentState) case Event(Take, currentState)
stay replying Busy(self) stay replying Busy(self)
case Event(Put, TakenBy(hakker)) if self.channel == hakker case Event(Put, TakenBy(hakker)) if channel == hakker
goto(Available) using TakenBy(NullChannel) goto(Available) using TakenBy(NullChannel)
} }

View file

@ -26,6 +26,6 @@ class Activator extends BundleActivator {
class EchoActor extends Actor { class EchoActor extends Actor {
override def receive = { override def receive = {
case x => self.reply(x) case x => reply(x)
} }
} }

View file

@ -10,7 +10,7 @@ import akka.actor. {ActorRegistry, Actor}
class HelloWorldActor extends Actor { class HelloWorldActor extends Actor {
def receive = { def receive = {
case "Hello" => case "Hello" =>
self.reply("World") reply("World")
} }
} }

View file

@ -38,7 +38,7 @@ object AkkaKernelPlugin extends Plugin {
val distNeedsPackageBin = dist <<= dist.dependsOn(packageBin in Compile) val distNeedsPackageBin = dist <<= dist.dependsOn(packageBin in Compile)
override lazy val settings = lazy val distSettings: Seq[Setting[_]] =
inConfig(Dist)(Seq( inConfig(Dist)(Seq(
dist <<= packageBin.identity, dist <<= packageBin.identity,
packageBin <<= distTask, packageBin <<= distTask,

View file

@ -284,8 +284,8 @@ class AgentUpdater[T](agent: Agent[T]) extends Actor {
def receive = { def receive = {
case update: Update[_] case update: Update[_]
self.tryReply(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T T] }) tryReply(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T T] })
case Get self reply agent.get case Get reply(agent.get)
case _ () case _ ()
} }
} }
@ -298,7 +298,7 @@ class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor {
def receive = { def receive = {
case update: Update[_] try { case update: Update[_] try {
self.tryReply(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T T] }) tryReply(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T T] })
} finally { } finally {
agent.resume() agent.resume()
self.stop() self.stop()

View file

@ -32,7 +32,7 @@ public class UntypedCoordinatedCounter extends UntypedActor {
} else if (incoming instanceof String) { } else if (incoming instanceof String) {
String message = (String) incoming; String message = (String) incoming;
if (message.equals("GetCount")) { if (message.equals("GetCount")) {
getContext().reply(count.get()); reply(count.get());
} }
} }
} }

View file

@ -26,7 +26,7 @@ public class UntypedCounter extends UntypedTransactor {
@Override public boolean normally(Object message) { @Override public boolean normally(Object message) {
if ("GetCount".equals(message)) { if ("GetCount".equals(message)) {
getContext().reply(count.get()); reply(count.get());
return true; return true;
} else return false; } else return false;
} }

View file

@ -57,7 +57,7 @@ public class UntypedCoordinatedCounter extends UntypedActor {
} else if (incoming instanceof String) { } else if (incoming instanceof String) {
String message = (String) incoming; String message = (String) incoming;
if (message.equals("GetCount")) { if (message.equals("GetCount")) {
getContext().reply(count.get()); reply(count.get());
} }
} }
} }

View file

@ -70,7 +70,7 @@ public class UntypedCounter extends UntypedTransactor {
@Override public boolean normally(Object message) { @Override public boolean normally(Object message) {
if ("GetCount".equals(message)) { if ("GetCount".equals(message)) {
getContext().reply(count.get()); reply(count.get());
return true; return true;
} else return false; } else return false;
} }

View file

@ -35,7 +35,7 @@ object CoordinatedIncrement {
} }
} }
case GetCount self.reply(count.get) case GetCount reply(count.get)
} }
} }

View file

@ -55,7 +55,7 @@ object FickleFriends {
} }
} }
case GetCount self.reply(count.get) case GetCount reply(count.get)
} }
} }
@ -92,7 +92,7 @@ object FickleFriends {
} }
} }
case GetCount self.reply(count.get) case GetCount reply(count.get)
} }
} }
} }

View file

@ -50,7 +50,7 @@ object TransactorIncrement {
} }
override def normally = { override def normally = {
case GetCount self.reply(count.get) case GetCount reply(count.get)
} }
} }

View file

@ -47,7 +47,7 @@ class TestActorRef[T <: Actor](props: Props, address: String) extends LocalActor
*/ */
override def supervisor_=(a: Option[ActorRef]) { override def supervisor_=(a: Option[ActorRef]) {
a match { //TODO This should probably be removed since the Supervisor could be a remote actor for all we know a match { //TODO This should probably be removed since the Supervisor could be a remote actor for all we know
case Some(l: SelfActorRef) if !l.dispatcher.isInstanceOf[CallingThreadDispatcher] case Some(l: LocalActorRef) if !l.underlying.dispatcher.isInstanceOf[CallingThreadDispatcher]
EventHandler.warning(this, "supervisor " + l + " does not use CallingThreadDispatcher") EventHandler.warning(this, "supervisor " + l + " does not use CallingThreadDispatcher")
case _ case _
} }

View file

@ -45,7 +45,7 @@ class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor with FSM[
case Event(x: AnyRef, ign) case Event(x: AnyRef, ign)
val ignore = ign map (z if (z isDefinedAt x) z(x) else false) getOrElse false val ignore = ign map (z if (z isDefinedAt x) z(x) else false) getOrElse false
if (!ignore) { if (!ignore) {
queue.offerLast(RealMessage(x, self.channel)) queue.offerLast(RealMessage(x, channel))
} }
stay stay
} }

View file

@ -40,22 +40,22 @@ object TestActorRefSpec {
def receiveT = { def receiveT = {
case "complexRequest" { case "complexRequest" {
replyTo = self.channel replyTo = channel
val worker = TestActorRef(Props[WorkerActor]) val worker = TestActorRef(Props[WorkerActor])
worker ! "work" worker ! "work"
} }
case "complexRequest2" case "complexRequest2"
val worker = TestActorRef(Props[WorkerActor]) val worker = TestActorRef(Props[WorkerActor])
worker ! self.channel worker ! channel
case "workDone" replyTo ! "complexReply" case "workDone" replyTo ! "complexReply"
case "simpleRequest" self.reply("simpleReply") case "simpleRequest" reply("simpleReply")
} }
} }
class WorkerActor() extends TActor { class WorkerActor() extends TActor {
def receiveT = { def receiveT = {
case "work" { case "work" {
self.reply("workDone") reply("workDone")
self.stop() self.stop()
} }
case replyTo: UntypedChannel { case replyTo: UntypedChannel {
@ -111,7 +111,7 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac
"used with TestActorRef" in { "used with TestActorRef" in {
val a = TestActorRef(Props(new Actor { val a = TestActorRef(Props(new Actor {
val nested = TestActorRef(Props(self { case _ })) val nested = TestActorRef(Props(self { case _ }))
def receive = { case _ self reply nested } def receive = { case _ reply(nested) }
})) }))
a must not be (null) a must not be (null)
val nested = (a ? "any").as[ActorRef].get val nested = (a ? "any").as[ActorRef].get
@ -122,7 +122,7 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac
"used with ActorRef" in { "used with ActorRef" in {
val a = TestActorRef(Props(new Actor { val a = TestActorRef(Props(new Actor {
val nested = Actor.actorOf(Props(self { case _ })) val nested = Actor.actorOf(Props(self { case _ }))
def receive = { case _ self reply nested } def receive = { case _ reply(nested) }
})) }))
a must not be (null) a must not be (null)
val nested = (a ? "any").as[ActorRef].get val nested = (a ? "any").as[ActorRef].get
@ -216,12 +216,12 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac
"set receiveTimeout to None" in { "set receiveTimeout to None" in {
val a = TestActorRef[WorkerActor] val a = TestActorRef[WorkerActor]
a.receiveTimeout must be(None) a.underlyingActor.receiveTimeout must be(None)
} }
"set CallingThreadDispatcher" in { "set CallingThreadDispatcher" in {
val a = TestActorRef[WorkerActor] val a = TestActorRef[WorkerActor]
a.dispatcher.getClass must be(classOf[CallingThreadDispatcher]) a.underlying.dispatcher.getClass must be(classOf[CallingThreadDispatcher])
} }
"warn about scheduled supervisor" in { "warn about scheduled supervisor" in {

View file

@ -78,7 +78,7 @@ public class Pi {
double result = calculatePiFor(work.getStart(), work.getNrOfElements()); double result = calculatePiFor(work.getStart(), work.getNrOfElements());
// reply with the result // reply with the result
getContext().reply(new Result(result)); reply(new Result(result));
} else throw new IllegalArgumentException("Unknown message [" + message + "]"); } else throw new IllegalArgumentException("Unknown message [" + message + "]");
} }
@ -118,7 +118,7 @@ public class Pi {
if (message instanceof Calculate) { if (message instanceof Calculate) {
// schedule work // schedule work
for (int start = 0; start < nrOfMessages; start++) { for (int start = 0; start < nrOfMessages; start++) {
router.tell(new Work(start, nrOfElements), getContext()); router.tell(new Work(start, nrOfElements), getSelf());
} }
// send a PoisonPill to all workers telling them to shut down themselves // send a PoisonPill to all workers telling them to shut down themselves
@ -133,7 +133,7 @@ public class Pi {
Result result = (Result) message; Result result = (Result) message;
pi += result.getValue(); pi += result.getValue();
nrOfResults += 1; nrOfResults += 1;
if (nrOfResults == nrOfMessages) getContext().stop(); if (nrOfResults == nrOfMessages) getSelf().stop();
} else throw new IllegalArgumentException("Unknown message [" + message + "]"); } else throw new IllegalArgumentException("Unknown message [" + message + "]");
} }

View file

@ -40,7 +40,7 @@ object Pi extends App {
def receive = { def receive = {
case Work(start, nrOfElements) case Work(start, nrOfElements)
self reply Result(calculatePiFor(start, nrOfElements)) // perform the work reply(Result(calculatePiFor(start, nrOfElements))) // perform the work
} }
} }

View file

@ -76,7 +76,7 @@ public class Pi {
public void onReceive(Object message) { public void onReceive(Object message) {
if (message instanceof Work) { if (message instanceof Work) {
Work work = (Work) message; Work work = (Work) message;
getContext().reply(new Result(calculatePiFor(work.getArg(), work.getNrOfElements()))); // perform the work reply(new Result(calculatePiFor(work.getArg(), work.getNrOfElements()))); // perform the work
} else throw new IllegalArgumentException("Unknown message [" + message + "]"); } else throw new IllegalArgumentException("Unknown message [" + message + "]");
} }
} }
@ -120,10 +120,10 @@ public class Pi {
public void apply(Object msg) { public void apply(Object msg) {
// schedule work // schedule work
for (int arg = 0; arg < nrOfMessages; arg++) { for (int arg = 0; arg < nrOfMessages; arg++) {
router.tell(new Work(arg, nrOfElements), getContext()); router.tell(new Work(arg, nrOfElements), getSelf());
} }
// Assume the gathering behavior // Assume the gathering behavior
become(gather(getContext().getChannel())); become(gather(getChannel()));
} }
}; };
@ -138,7 +138,7 @@ public class Pi {
// send the pi result back to the guy who started the calculation // send the pi result back to the guy who started the calculation
recipient.tell(pi); recipient.tell(pi);
// shut ourselves down, we're done // shut ourselves down, we're done
getContext().stop(); getSelf().stop();
} }
} }
}; };

View file

@ -38,7 +38,7 @@ object Pi extends App {
def receive = { def receive = {
case Work(arg, nrOfElements) case Work(arg, nrOfElements)
self reply Result(calculatePiFor(arg, nrOfElements)) // perform the work reply(Result(calculatePiFor(arg, nrOfElements))) // perform the work
} }
} }
@ -62,7 +62,7 @@ object Pi extends App {
for (arg 0 until nrOfMessages) router ! Work(arg, nrOfElements) for (arg 0 until nrOfMessages) router ! Work(arg, nrOfElements)
//Assume the gathering behavior //Assume the gathering behavior
this become gather(self.channel) this become gather(channel)
} }
// phase 2, aggregate the results of the Calculation // phase 2, aggregate the results of the Calculation

View file

@ -331,7 +331,8 @@ object AkkaBuild extends Build {
testExcludes := akkaTestExcludes, testExcludes := akkaTestExcludes,
testOptions in Test <++= testExcludes map { _.map(exclude => Tests.Filter(test => !test.contains(exclude))) }, testOptions in Test <++= testExcludes map { _.map(exclude => Tests.Filter(test => !test.contains(exclude))) },
testOptions in Test += Tests.Argument("-oF") //Gimmeh gud stacktraces plz // show full stack traces
testOptions in Test += Tests.Argument("-oF")
) )
lazy val formatSettings = ScalariformPlugin.settings ++ Seq( lazy val formatSettings = ScalariformPlugin.settings ++ Seq(