#1299 - Removing reply and tryReply, preparing the way for 'sender ! response'

This commit is contained in:
Viktor Klang 2011-10-19 16:59:47 +02:00
parent 2d4251fcee
commit 77dc9e9c35
61 changed files with 144 additions and 186 deletions

View file

@ -2,6 +2,6 @@ package akka.actor;
public class JavaAPITestActor extends UntypedActor {
public void onReceive(Object msg) {
tryReply("got it!");
getChannel().tryTell("got it!");
}
}

View file

@ -15,7 +15,7 @@ object ActorFireForgetRequestReplySpec {
class ReplyActor extends Actor {
def receive = {
case "Send"
reply("Reply")
channel ! "Reply"
case "SendImplicit"
channel ! "ReplyImplicit"
}

View file

@ -25,7 +25,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
val currentGen = generationProvider.getAndIncrement()
override def preStart() { report("preStart") }
override def postStop() { report("postStop") }
def receive = { case "status" this reply message("OK") }
def receive = { case "status" channel ! message("OK") }
}
"An Actor" must {

View file

@ -35,7 +35,7 @@ object ActorRefSpec {
val worker = context.actorOf(Props[WorkerActor])
worker ! ReplyTo(channel)
case "workDone" replyTo ! "complexReply"
case "simpleRequest" reply("simpleReply")
case "simpleRequest" channel ! "simpleReply"
}
}
@ -43,7 +43,7 @@ object ActorRefSpec {
def receive = {
case "work" {
work
reply("workDone")
channel ! "workDone"
self.stop()
}
case ReplyTo(replyTo) {
@ -74,7 +74,7 @@ object ActorRefSpec {
class OuterActor(val inner: ActorRef) extends Actor {
def receive = {
case "self" reply(self)
case "self" channel ! self
case x inner forward x
}
}
@ -83,7 +83,7 @@ object ActorRefSpec {
val fail = new InnerActor
def receive = {
case "self" reply(self)
case "self" channel ! self
case x inner forward x
}
}
@ -94,8 +94,8 @@ object ActorRefSpec {
class InnerActor extends Actor {
def receive = {
case "innerself" reply(self)
case other reply(other)
case "innerself" channel ! self
case other channel ! other
}
}
@ -103,8 +103,8 @@ object ActorRefSpec {
val fail = new InnerActor
def receive = {
case "innerself" reply(self)
case other reply(other)
case "innerself" channel ! self
case other channel ! other
}
}
@ -321,7 +321,7 @@ class ActorRefSpec extends AkkaSpec {
"support nested actorOfs" in {
val a = actorOf(new Actor {
val nested = actorOf(new Actor { def receive = { case _ } })
def receive = { case _ reply(nested) }
def receive = { case _ channel ! nested }
})
val nested = (a ? "any").as[ActorRef].get
@ -369,8 +369,8 @@ class ActorRefSpec extends AkkaSpec {
val timeout = Timeout(20000)
val ref = actorOf(Props(new Actor {
def receive = {
case 5 tryReply("five")
case null tryReply("null")
case 5 channel.tryTell("five")
case null channel.tryTell("null")
}
}))

View file

@ -15,7 +15,7 @@ object ForwardActorSpec {
def createForwardingChain(app: AkkaApplication): ActorRef = {
val replier = app.actorOf(new Actor {
def receive = { case x reply(x) }
def receive = { case x channel ! x }
})
def mkforwarder(forwardTo: ActorRef) = app.actorOf(

View file

@ -48,7 +48,7 @@ object IOActorSpec {
def receiveIO = {
case length: Int
val bytes = socket.read(length)
reply(bytes)
channel ! bytes
}
}
}
@ -108,9 +108,9 @@ object IOActorSpec {
case msg: NewClient createWorker forward msg
case ('set, key: String, value: ByteString)
kvs += (key -> value)
tryReply(())
case ('get, key: String) tryReply(kvs.get(key))
case 'getall tryReply(kvs)
channel.tryTell(())(self)
case ('get, key: String) channel.tryTell(kvs.get(key))(self)
case 'getall channel.tryTell(kvs)(self)
}
}
@ -123,18 +123,20 @@ object IOActorSpec {
socket = connect(ioManager, host, port)
}
def reply(msg: Any) = channel.tryTell(msg)(self)
def receiveIO = {
case ('set, key: String, value: ByteString)
socket write (ByteString("SET " + key + " " + value.length + "\r\n") ++ value)
tryReply(readResult)
reply(readResult)
case ('get, key: String)
socket write ByteString("GET " + key + "\r\n")
tryReply(readResult)
reply(readResult)
case 'getall
socket write ByteString("GETALL\r\n")
tryReply(readResult)
reply(readResult)
}
def readResult = {

View file

@ -62,7 +62,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
app.eventHandler.addListener(testActor)
val actor = TestActorRef(new Actor {
def receive = loggable(this) {
case _ reply("x")
case _ channel ! "x"
}
})
actor ! "buh"
@ -91,7 +91,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
app.eventHandler.addListener(testActor)
val actor = TestActorRef(new Actor {
def receive = loggable(this)(loggable(this) {
case _ reply("x")
case _ channel ! "x"
})
})
actor ! "buh"

View file

@ -24,7 +24,7 @@ class SupervisorMiscSpec extends AkkaSpec {
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
protected def receive = {
case "status" this.reply("OK")
case "status" this.channel ! "OK"
case _ this.self.stop()
}
}).withSupervisor(supervisor)

View file

@ -42,7 +42,7 @@ object SupervisorSpec {
def receive = {
case Ping
messageLog.put(PingMessage)
tryReply(PongMessage)
channel.tryTell(PongMessage)
case Die
throw new RuntimeException(ExceptionMessage)
}
@ -298,7 +298,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
if (inits.get % 2 == 0) throw new IllegalStateException("Don't wanna!")
def receive = {
case Ping tryReply(PongMessage)
case Ping channel.tryTell(PongMessage)
case Die throw new RuntimeException("Expected")
}
}).withSupervisor(supervisor))

View file

@ -47,11 +47,11 @@ object Ticket669Spec {
}
override def preRestart(reason: scala.Throwable, msg: Option[Any]) {
tryReply("failure1")
channel.tryTell("failure1")
}
override def postStop() {
tryReply("failure2")
channel.tryTell("failure2")
}
}
}

View file

@ -70,8 +70,8 @@ object ActorModelSpec {
case Meet(sign, wait) ack; sign.countDown(); wait.await(); busy.switchOff()
case Wait(time) ack; Thread.sleep(time); busy.switchOff()
case WaitAck(time, l) ack; Thread.sleep(time); l.countDown(); busy.switchOff()
case Reply(msg) ack; reply(msg); busy.switchOff()
case TryReply(msg) ack; tryReply(msg); busy.switchOff()
case Reply(msg) ack; channel ! msg; busy.switchOff()
case TryReply(msg) ack; channel.tryTell(msg); busy.switchOff()
case Forward(to, msg) ack; to.forward(msg); busy.switchOff()
case CountDown(latch) ack; latch.countDown(); busy.switchOff()
case Increment(count) ack; count.incrementAndGet(); busy.switchOff()

View file

@ -9,7 +9,7 @@ import akka.actor.{ Props, Actor }
object DispatcherActorSpec {
class TestActor extends Actor {
def receive = {
case "Hello" reply("World")
case "Hello" channel ! "World"
case "Failure" throw new RuntimeException("Expected exception; to test fault-tolerance")
}
}

View file

@ -12,7 +12,7 @@ import org.scalatest.BeforeAndAfterEach
object PinnedActorSpec {
class TestActor extends Actor {
def receive = {
case "Hello" reply("World")
case "Hello" channel ! "World"
case "Failure" throw new RuntimeException("Expected exception; to test fault-tolerance")
}
}

View file

@ -17,20 +17,16 @@ import org.scalatest.junit.JUnitSuite
object FutureSpec {
class TestActor extends Actor {
def receive = {
case "Hello"
reply("World")
case "NoReply" {}
case "Failure"
throw new RuntimeException("Expected exception; to test fault-tolerance")
case "Hello" channel ! "World"
case "Failure" throw new RuntimeException("Expected exception; to test fault-tolerance")
case "NoReply"
}
}
class TestDelayActor(await: StandardLatch) extends Actor {
def receive = {
case "Hello"
await.await
reply("World")
case "NoReply" { await.await }
case "Hello" await.await; channel ! "World"
case "NoReply" await.await
case "Failure"
await.await
throw new RuntimeException("Expected exception; to test fault-tolerance")
@ -140,7 +136,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
"will return a result" must {
behave like futureWithResult { test
val actor1 = actorOf[TestActor]
val actor2 = actorOf(new Actor { def receive = { case s: String reply(s.toUpperCase) } })
val actor2 = actorOf(new Actor { def receive = { case s: String channel ! s.toUpperCase } })
val future = actor1 ? "Hello" flatMap { case s: String actor2 ? s }
future.await
test(future, "WORLD")
@ -152,7 +148,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
behave like futureWithException[ArithmeticException] { test
filterException[ArithmeticException] {
val actor1 = actorOf[TestActor]
val actor2 = actorOf(new Actor { def receive = { case s: String reply(s.length / 0) } })
val actor2 = actorOf(new Actor { def receive = { case s: String channel ! s.length / 0 } })
val future = actor1 ? "Hello" flatMap { case s: String actor2 ? s }
future.await
test(future, "/ by zero")
@ -165,7 +161,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
behave like futureWithException[MatchError] { test
filterException[MatchError] {
val actor1 = actorOf[TestActor]
val actor2 = actorOf(new Actor { def receive = { case s: String reply(s.toUpperCase) } })
val actor2 = actorOf(new Actor { def receive = { case s: String channel ! s.toUpperCase } })
val future = actor1 ? "Hello" flatMap { case i: Int actor2 ? i }
future.await
test(future, "World (of class java.lang.String)")
@ -182,8 +178,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
filterException[ClassCastException] {
val actor = actorOf(new Actor {
def receive = {
case s: String reply(s.length)
case i: Int reply((i * 2).toString)
case s: String channel ! s.length
case i: Int channel ! (i * 2).toString
}
})
@ -214,8 +210,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
case class Res[T](res: T)
val actor = actorOf(new Actor {
def receive = {
case Req(s: String) reply(Res(s.length))
case Req(i: Int) reply(Res((i * 2).toString))
case Req(s: String) channel ! Res(s.length)
case Req(i: Int) channel ! Res((i * 2).toString)
}
})
@ -301,7 +297,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
"fold" in {
val actors = (1 to 10).toList map { _
actorOf(new Actor {
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); tryReply(add) }
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); channel.tryTell(add) }
})
}
val timeout = 10000
@ -312,7 +308,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
"fold by composing" in {
val actors = (1 to 10).toList map { _
actorOf(new Actor {
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); tryReply(add) }
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); channel.tryTell(add) }
})
}
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200), 10000).mapTo[Int] }
@ -327,7 +323,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
case (add: Int, wait: Int)
Thread.sleep(wait)
if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
tryReply(add)
channel.tryTell(add)
}
})
}
@ -359,7 +355,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
"shouldReduceResults" in {
val actors = (1 to 10).toList map { _
actorOf(new Actor {
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); tryReply(add) }
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); channel.tryTell(add) }
})
}
val timeout = 10000
@ -375,7 +371,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
case (add: Int, wait: Int)
Thread.sleep(wait)
if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
tryReply(add)
channel.tryTell(add)
}
})
}
@ -404,7 +400,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
var counter = 1
def receive = {
case 'GetNext
reply(counter)
channel ! counter
counter += 2
}
})

View file

@ -29,7 +29,7 @@ class PriorityDispatcherSpec extends AkkaSpec {
def receive = {
case i: Int acc = i :: acc
case 'Result tryReply(acc)
case 'Result channel.tryTell(acc)
}
}).withDispatcher(dispatcher)).asInstanceOf[LocalActorRef]

View file

@ -41,7 +41,7 @@ class ActorPoolSpec extends AkkaSpec {
case _
count.incrementAndGet
latch.countDown()
tryReply("success")
channel.tryTell("success")
}
}))
@ -88,7 +88,7 @@ class ActorPoolSpec extends AkkaSpec {
def receive = {
case req: String {
sleepFor(10 millis)
tryReply("Response")
channel.tryTell("Response")
}
}
}))

View file

@ -73,7 +73,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec {
val actor = app.actorOf(Props(new Actor {
lazy val id = counter.getAndIncrement()
def receive = {
case "hit" reply(id)
case "hit" channel ! id
case "end" doneLatch.countDown()
}
}), address)
@ -187,7 +187,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec {
val actor = app.actorOf(Props(new Actor {
lazy val id = counter.getAndIncrement()
def receive = {
case "hit" reply(id)
case "hit" channel ! id
case "end" doneLatch.countDown()
}
}), address)

View file

@ -458,7 +458,7 @@ class RoutingSpec extends AkkaSpec {
case Stop(None) self.stop()
case Stop(Some(_id)) if (_id == id) self.stop()
case _id: Int if (_id == id)
case _ Thread sleep 100 * id; tryReply(id)
case _ Thread sleep 100 * id; channel.tryTell(id)
}
override def postStop = {

View file

@ -21,7 +21,7 @@ class Ticket703Spec extends AkkaSpec {
def receive = {
case req: String
Thread.sleep(6000L)
tryReply("Response")
channel.tryTell("Response")
}
}))
}).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 5, 1000)))

View file

@ -295,31 +295,6 @@ trait Actor {
*/
def receiveTimeout_=(timeout: Option[Long]) = context.receiveTimeout = timeout
/**
* Akka Scala & Java API
* Use <code>reply(..)</code> to reply with a message to the original sender of the message currently
* being processed. This method fails if the original sender of the message could not be determined with an
* IllegalStateException.
*
* If you don't want deal with this IllegalStateException, but just a boolean, just use the <code>tryReply(...)</code>
* version.
*
* <p/>
* Throws an IllegalStateException if unable to determine what to reply to.
*/
def reply(message: Any) = channel.!(message)(self)
/**
* Akka Scala & Java API
* Use <code>tryReply(..)</code> to try reply with a message to the original sender of the message currently
* being processed. This method
* <p/>
* Returns true if reply was sent, and false if unable to determine what to reply to.
*
* If you would rather have an exception, check the <code>reply(..)</code> version.
*/
def tryReply(message: Any): Boolean = channel.tryTell(message)(self)
/**
* Same as ActorContext.children
*/
@ -341,7 +316,7 @@ trait Actor {
* def receive = {
* case Ping =&gt;
* println("got a 'Ping' message")
* reply("pong")
* channel ! "pong"
*
* case OneWay =&gt;
* println("got a 'OneWay' message")

View file

@ -68,7 +68,7 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha
* Use this method with care. In most cases it is better to use 'tell' together with the 'getContext().getSender()' to
* implement request/response message exchanges.
* <p/>
* If you are sending messages using <code>ask</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
* If you are sending messages using <code>ask</code> then you <b>have to</b> use <code>getContext().channel().tell(...)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def ask(message: AnyRef, timeout: Long, sender: ActorRef): Future[AnyRef] =

View file

@ -506,7 +506,7 @@ trait FSM[S, D] extends ListenerManagement {
nextState.stopReason match {
case None makeTransition(nextState)
case _
nextState.replies.reverse foreach reply
nextState.replies.reverse foreach { r channel ! r }
terminate(nextState)
self.stop()
}
@ -516,7 +516,7 @@ trait FSM[S, D] extends ListenerManagement {
if (!stateFunctions.contains(nextState.stateName)) {
terminate(stay withStopReason Failure("Next state %s does not exist".format(nextState.stateName)))
} else {
nextState.replies.reverse foreach reply
nextState.replies.reverse foreach { r channel ! r }
if (currentState.stateName != nextState.stateName) {
handleTransition(currentState.stateName, nextState.stateName)
notifyListeners(Transition(self, currentState.stateName, nextState.stateName))

View file

@ -281,7 +281,7 @@ class TypedActor(val app: AkkaApplication) {
case p: ActorPromise p completeWith m(me).asInstanceOf[Future[Any]]
case _ throw new IllegalStateException("Future-returning TypedActor didn't use ?/ask so cannot reply")
}
} else reply(m(me))
} else channel ! m(me)
} finally {
TypedActor.selfReference set null

View file

@ -21,7 +21,7 @@ import akka.dispatch.{ MessageDispatcher, Promise }
*
* if (msg.equals("UseReply")) {
* // Reply to original sender of message using the 'reply' method
* reply(msg + ":" + getSelf().getAddress());
* getContext().getChannel().tell(msg + ":" + getSelf().getAddress());
*
* } else if (msg.equals("UseSender") && getSender().isDefined()) {
* // Reply to original sender of message using the sender reference

View file

@ -103,7 +103,7 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒
protected def _route(): Actor.Receive = {
// for testing...
case Stat
tryReply(Stats(_delegates length))
channel.tryTell(Stats(_delegates length))
case Terminated(victim, _)
_delegates = _delegates filterNot { victim == }
case msg

View file

@ -24,7 +24,7 @@ object TypedCamelTestSupport {
def countdown: Handler = {
case SetExpectedMessageCount(num) {
latch = new CountDownLatch(num)
reply(latch)
channel ! latch
}
case msg latch.countDown
}
@ -32,7 +32,7 @@ object TypedCamelTestSupport {
trait Respond { this: Actor
def respond: Handler = {
case msg: Message reply(response(msg))
case msg: Message channel ! response(msg)
}
def response(msg: Message): Any = "Hello %s" format msg.body
@ -42,8 +42,8 @@ object TypedCamelTestSupport {
val messages = Buffer[Any]()
def retain: Handler = {
case GetRetainedMessage reply(messages.last)
case GetRetainedMessages(p) reply(messages.toList.filter(p))
case GetRetainedMessage channel ! messages.last
case GetRetainedMessages(p) channel ! messages.filter(p).toList
case msg {
messages += msg
msg

View file

@ -127,11 +127,11 @@ private[camel] class ActivationTracker extends Actor {
def receive = {
case SetExpectedActivationCount(num) {
activationLatch = new CountDownLatch(num)
reply(activationLatch)
channel ! activationLatch
}
case SetExpectedDeactivationCount(num) {
deactivationLatch = new CountDownLatch(num)
reply(deactivationLatch)
channel ! deactivationLatch
}
case EndpointActivated activationLatch.countDown
case EndpointDeactivated deactivationLatch.countDown

View file

@ -159,7 +159,7 @@ trait ProducerSupport { this: Actor ⇒
* actor).
*/
protected def receiveAfterProduce: Receive = {
case msg if (!oneway) reply(msg)
case msg if (!oneway) channel ! msg
}
/**

View file

@ -15,7 +15,7 @@ public class SampleUntypedConsumer extends UntypedConsumerActor {
Message msg = (Message)message;
String body = msg.getBodyAs(String.class);
String header = msg.getHeaderAs("test", String.class);
tryReply(String.format("%s %s", body, header));
channel.tryTell(String.format("%s %s", body, header));
}
}

View file

@ -17,7 +17,7 @@ public class SampleUntypedConsumerBlocking extends UntypedConsumerActor {
Message msg = (Message)message;
String body = msg.getBodyAs(String.class);
String header = msg.getHeaderAs("test", String.class);
tryReply(String.format("%s %s", body, header));
channel.tryTell(String.format("%s %s", body, header));
}
}

View file

@ -36,7 +36,7 @@ object CamelTestSupport {
def countdown: Handler = {
case SetExpectedMessageCount(num) {
latch = new CountDownLatch(num)
reply(latch)
channel ! latch
}
case msg latch.countDown
}
@ -44,7 +44,7 @@ object CamelTestSupport {
trait Respond { this: Actor
def respond: Handler = {
case msg: Message reply(response(msg))
case msg: Message channel ! response(msg)
}
def response(msg: Message): Any = "Hello %s" format msg.body
@ -54,8 +54,8 @@ object CamelTestSupport {
val messages = Buffer[Any]()
def retain: Handler = {
case GetRetainedMessage reply(messages.last)
case GetRetainedMessages(p) reply(messages.toList.filter(p))
case GetRetainedMessage channel ! messages.last
case GetRetainedMessages(p) channel ! messages.filter(p).toList
case msg {
messages += msg
msg

View file

@ -211,7 +211,7 @@ object ConsumerScalaTest {
class TestConsumer(uri: String) extends Actor with Consumer {
def endpointUri = uri
protected def receive = {
case msg: Message reply("received %s" format msg.body)
case msg: Message channel ! "received %s" format msg.body
}
}
@ -226,7 +226,7 @@ object ConsumerScalaTest {
def endpointUri = uri
override def autoack = false
protected def receive = {
case msg: Message reply(Ack)
case msg: Message channel ! Ack
}
}
@ -247,15 +247,15 @@ object ConsumerScalaTest {
protected def receive = {
case "fail" { throw new Exception("test") }
case "succeed" reply("ok")
case "succeed" channel ! "ok"
}
override def preRestart(reason: scala.Throwable, msg: Option[Any]) {
tryReply("pr")
channel.tryTell("pr")
}
override def postStop {
tryReply("ps")
channel.tryTell("ps")
}
}
@ -288,7 +288,7 @@ object ConsumerScalaTest {
}
private def respondTo(msg: Message) =
if (valid) reply("accepted: %s" format msg.body)
if (valid) channel ! ("accepted: %s" format msg.body)
else throw new Exception("rejected: %s" format msg.body)
}

View file

@ -253,18 +253,16 @@ object ProducerFeatureTest {
class TestResponder extends Actor {
protected def receive = {
case msg: Message msg.body match {
case "fail" reply(Failure(new Exception("failure"), msg.headers))
case _ reply(msg.transformBody { body: String "received %s" format body })
case "fail" channel ! Failure(new Exception("failure"), msg.headers)
case _ channel ! (msg.transformBody { body: String "received %s" format body })
}
}
}
class ReplyingForwardTarget extends Actor {
protected def receive = {
case msg: Message
reply(msg.addHeader("test" -> "result"))
case msg: Failure
reply(Failure(msg.cause, msg.headers + ("test" -> "failure")))
case msg: Message channel ! msg.addHeader("test" -> "result")
case msg: Failure channel ! Failure(msg.cause, msg.headers + ("test" -> "failure"))
}
}

View file

@ -96,13 +96,13 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with
object ActorComponentFeatureTest {
class CustomIdActor extends Actor {
protected def receive = {
case msg: Message reply("Received %s" format msg.body)
case msg: Message channel ! ("Received %s" format msg.body)
}
}
class FailWithMessage extends Actor {
protected def receive = {
case msg: Message reply(Failure(new Exception("test")))
case msg: Message channel ! Failure(new Exception("test"))
}
}

View file

@ -50,8 +50,7 @@ object Pi extends App {
//#calculatePiFor
def receive = {
case Work(start, nrOfElements)
reply(Result(calculatePiFor(start, nrOfElements))) // perform the work
case Work(start, nrOfElements) channel ! Result(calculatePiFor(start, nrOfElements)) // perform the work
}
}
//#worker

View file

@ -149,10 +149,10 @@ class RemoteDaemon(val remote: Remote) extends Actor {
app.eventHandler.error(this, "Actor 'address' is not defined, ignoring remote daemon command [%s]".format(message))
}
reply(Success(address.toString))
channel ! Success(address.toString)
} catch {
case error: Throwable
reply(Failure(error))
channel ! Failure(error)
throw error
}
}
@ -184,7 +184,7 @@ class RemoteDaemon(val remote: Remote) extends Actor {
new LocalActorRef(app,
Props(
context {
case f: Function0[_] try { reply(f()) } finally { context.self.stop() }
case f: Function0[_] try { channel ! f() } finally { context.self.stop() }
}).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) forward payloadFor(message, classOf[Function0[Any]])
}
@ -200,7 +200,7 @@ class RemoteDaemon(val remote: Remote) extends Actor {
new LocalActorRef(app,
Props(
context {
case (fun: Function[_, _], param: Any) try { reply(fun.asInstanceOf[Any Any](param)) } finally { context.self.stop() }
case (fun: Function[_, _], param: Any) try { channel ! fun.asInstanceOf[Any Any](param) } finally { context.self.stop() }
}).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]])
}

View file

@ -10,9 +10,7 @@ object DirectRoutedRemoteActorMultiJvmSpec {
class SomeActor extends Actor with Serializable {
def receive = {
case "identify" {
reply(app.nodename)
}
case "identify" channel ! app.nodename
}
}
}

View file

@ -8,9 +8,7 @@ object NewRemoteActorMultiJvmSpec {
class SomeActor extends Actor with Serializable {
def receive = {
case "identify" {
reply(app.nodename)
}
case "identify" channel ! app.nodename
}
}
}

View file

@ -9,7 +9,7 @@ object RandomRoutedRemoteActorMultiJvmSpec {
val NrOfNodes = 4
class SomeActor extends Actor with Serializable {
def receive = {
case "hit" reply(app.nodename)
case "hit" channel ! app.nodename
case "end" self.stop()
}
}

View file

@ -9,7 +9,7 @@ object RoundRobinRoutedRemoteActorMultiJvmSpec {
val NrOfNodes = 4
class SomeActor extends Actor with Serializable {
def receive = {
case "hit" reply(app.nodename)
case "hit" channel ! app.nodename
case "end" self.stop()
}
}

View file

@ -128,7 +128,7 @@ class MyJavaSerializableActor extends Actor with scala.Serializable {
def receive = {
case "hello"
count = count + 1
reply("world " + count)
channel ! "world " + count
}
}
@ -136,7 +136,7 @@ class MyStatelessActorWithMessagesInMailbox extends Actor with scala.Serializabl
def receive = {
case "hello"
Thread.sleep(500)
case "hello-reply" reply("world")
case "hello-reply" channel ! "world"
}
}
@ -144,7 +144,7 @@ class MyActorWithProtobufMessagesInMailbox extends Actor with scala.Serializable
def receive = {
case m: Message
Thread.sleep(500)
case "hello-reply" reply("world")
case "hello-reply" channel ! "world"
}
}
@ -152,6 +152,6 @@ class PersonActorWithMessagesInMailbox extends Actor with scala.Serializable {
def receive = {
case p: Person
Thread.sleep(500)
case "hello-reply" reply("hello")
case "hello-reply" channel ! "hello"
}
}

View file

@ -15,6 +15,6 @@ public class UntypedConsumer1 extends UntypedConsumerActor {
public void onReceive(Object message) {
Message msg = (Message)message;
String body = msg.getBodyAs(String.class);
tryReply(String.format("received %s", body));
channel.tryTell(String.format("received %s", body));
}
}

View file

@ -12,7 +12,7 @@ class RemoteActor1 extends Actor with Consumer {
def endpointUri = "jetty:http://localhost:6644/camel/remote-actor-1"
protected def receive = {
case msg: Message reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote1")))
case msg: Message channel ! Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote1"))
}
}
@ -23,7 +23,7 @@ class RemoteActor2 extends Actor with Consumer {
def endpointUri = "jetty:http://localhost:6644/camel/remote-actor-2"
protected def receive = {
case msg: Message reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote2")))
case msg: Message channel ! Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote2"))
}
}
@ -44,7 +44,7 @@ class Consumer2 extends Actor with Consumer {
def endpointUri = "jetty:http://0.0.0.0:8877/camel/default"
def receive = {
case msg: Message reply("Hello %s" format msg.bodyAs[String])
case msg: Message channel ! ("Hello %s" format msg.bodyAs[String])
}
}
@ -62,10 +62,10 @@ class Consumer4 extends Actor with Consumer {
def receive = {
case msg: Message msg.bodyAs[String] match {
case "stop" {
reply("Consumer4 stopped")
channel ! "Consumer4 stopped"
self.stop
}
case body reply(body)
case body channel ! body
}
}
}
@ -76,7 +76,7 @@ class Consumer5 extends Actor with Consumer {
def receive = {
case _ {
Actor.actorOf[Consumer4]
reply("Consumer4 started")
channel ! "Consumer4 started"
}
}
}
@ -106,7 +106,7 @@ class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consu
protected def receive = {
case msg: Message {
publisher ! msg.bodyAs[String]
reply("message published")
channel ! "message published"
}
}
}
@ -135,8 +135,8 @@ class HttpProducer(transformer: ActorRef) extends Actor with Producer {
class HttpTransformer extends Actor {
protected def receive = {
case msg: Message reply(msg.transformBody { body: String body replaceAll ("Akka ", "AKKA ") })
case msg: Failure reply(msg)
case msg: Message channel ! (msg.transformBody { body: String body replaceAll ("Akka ", "AKKA ") })
case msg: Failure channel ! msg
}
}
@ -150,11 +150,11 @@ class FileConsumer extends Actor with Consumer {
case msg: Message {
if (counter == 2) {
println("received %s" format msg.bodyAs[String])
reply(Ack)
channel ! Ack
} else {
println("rejected %s" format msg.bodyAs[String])
counter += 1
reply(Failure(new Exception("message number %s not accepted" format counter)))
channel ! Failure(new Exception("message number %s not accepted" format counter))
}
}
}

View file

@ -15,7 +15,7 @@ public class SampleRemoteUntypedConsumer extends UntypedConsumerActor {
Message msg = (Message)message;
String body = msg.getBodyAs(String.class);
String header = msg.getHeaderAs("test", String.class);
tryReply(String.format("%s %s", body, header));
channel.tryTell(String.format("%s %s", body, header));
}
}

View file

@ -66,7 +66,7 @@ object HttpConcurrencyTestStress {
var correlationIds = Set[Any]()
override protected def receive = {
case "getCorrelationIdCount" reply(correlationIds.size)
case "getCorrelationIdCount" channel ! correlationIds.size
case msg super.receive(msg)
}
@ -93,7 +93,7 @@ object HttpConcurrencyTestStress {
class HttpServerWorker extends Actor {
protected def receive = {
case msg reply(msg)
case msg channel ! msg
}
}
}

View file

@ -94,8 +94,8 @@ object RemoteConsumerTest {
def endpointUri = "direct:remote-consumer"
protected def receive = {
case "init" reply("done")
case m: Message reply("remote actor: %s" format m.body)
case "init" channel ! "done"
case m: Message channel ! ("remote actor: %s" format m.body)
}
}
}

View file

@ -288,8 +288,8 @@ class AgentUpdater[T](agent: Agent[T]) extends Actor {
def receive = {
case update: Update[_]
tryReply(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T T] })
case Get reply(agent.get)
channel.tryTell(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T T] })
case Get channel ! agent.get
case _ ()
}
}
@ -302,7 +302,7 @@ class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor {
def receive = {
case update: Update[_] try {
tryReply(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T T] })
channel.tryTell(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T T] })
} finally {
agent.resume()
self.stop()

View file

@ -32,7 +32,7 @@ public class UntypedCoordinatedCounter extends UntypedActor {
} else if (incoming instanceof String) {
String message = (String) incoming;
if (message.equals("GetCount")) {
reply(count.get());
getChannel().tell(count.get());
}
}
}

View file

@ -26,7 +26,7 @@ public class UntypedCounter extends UntypedTransactor {
@Override public boolean normally(Object message) {
if ("GetCount".equals(message)) {
reply(count.get());
getChannel().tell(count.get());
return true;
} else return false;
}

View file

@ -57,7 +57,7 @@ public class UntypedCoordinatedCounter extends UntypedActor {
} else if (incoming instanceof String) {
String message = (String) incoming;
if (message.equals("GetCount")) {
reply(count.get());
getChannel().tell(count.get());
}
}
}

View file

@ -70,7 +70,7 @@ public class UntypedCounter extends UntypedTransactor {
@Override public boolean normally(Object message) {
if ("GetCount".equals(message)) {
reply(count.get());
getChannel().tell(count.get());
return true;
} else return false;
}

View file

@ -34,7 +34,7 @@ object CoordinatedIncrement {
}
}
case GetCount reply(count.get)
case GetCount channel ! count.get
}
}

View file

@ -56,7 +56,7 @@ object FickleFriends {
}
}
case GetCount reply(count.get)
case GetCount channel ! count.get
}
}
@ -93,7 +93,7 @@ object FickleFriends {
}
}
case GetCount reply(count.get)
case GetCount channel ! count.get
}
}
}

View file

@ -49,7 +49,7 @@ object TransactorIncrement {
}
override def normally = {
case GetCount reply(count.get)
case GetCount channel ! count.get
}
}

View file

@ -224,14 +224,6 @@ class TestKit(_app: AkkaApplication) {
*/
def within[T](max: Duration)(f: T): T = within(0 seconds, max)(f)
/**
* Send reply to the last dequeued message. Will throw
* IllegalActorStateException if no message has been dequeued, yet. Dequeuing
* means reception of the message as part of an expect... or receive... call,
* not reception by the testActor.
*/
def reply(msg: AnyRef) { lastMessage.channel.!(msg)(testActor) }
/**
* Same as `expectMsg(remaining, obj)`, but correctly treating the timeFactor.
*/

View file

@ -48,14 +48,14 @@ object TestActorRefSpec {
val worker = TestActorRef(Props[WorkerActor])
worker ! channel
case "workDone" replyTo ! "complexReply"
case "simpleRequest" reply("simpleReply")
case "simpleRequest" channel ! "simpleReply"
}
}
class WorkerActor() extends TActor {
def receiveT = {
case "work" {
reply("workDone")
channel ! "workDone"
self.stop()
}
case replyTo: UntypedChannel {
@ -109,7 +109,7 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach {
"used with TestActorRef" in {
val a = TestActorRef(Props(new Actor {
val nested = TestActorRef(Props(self { case _ }))
def receive = { case _ reply(nested) }
def receive = { case _ channel ! nested }
}))
a must not be (null)
val nested = (a ? "any").as[ActorRef].get
@ -120,7 +120,7 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach {
"used with ActorRef" in {
val a = TestActorRef(Props(new Actor {
val nested = context.actorOf(Props(self { case _ }))
def receive = { case _ reply(nested) }
def receive = { case _ channel ! nested }
}))
a must not be (null)
val nested = (a ? "any").as[ActorRef].get

View file

@ -16,7 +16,7 @@ class TestProbeSpec extends AkkaSpec {
val tk = TestProbe()
val future = tk.ref ? "hello"
tk.expectMsg(0 millis, "hello") // TestActor runs on CallingThreadDispatcher
tk.reply("world")
tk.lastMessage.channel ! "world"
future must be('completed)
future.get must equal("world")
}
@ -26,7 +26,7 @@ class TestProbeSpec extends AkkaSpec {
val tk2 = TestProbe()
tk1.ref.!("hello")(tk2.ref)
tk1.expectMsg(0 millis, "hello")
tk1.reply("world")
tk1.lastMessage.channel ! "world"
tk2.expectMsg(0 millis, "world")
}
@ -35,7 +35,7 @@ class TestProbeSpec extends AkkaSpec {
val probe2 = TestProbe()
probe1.send(probe2.ref, "hello")
probe2.expectMsg(0 millis, "hello")
probe2.reply("world")
probe2.lastMessage.channel ! "world"
probe1.expectMsg(0 millis, "world")
}

View file

@ -83,7 +83,7 @@ public class Pi {
double result = calculatePiFor(work.getStart(), work.getNrOfElements());
// reply with the result
reply(new Result(result));
getChannel().tell(new Result(result));
} else throw new IllegalArgumentException("Unknown message [" + message + "]");
}

View file

@ -43,7 +43,7 @@ object Pi extends App {
def receive = {
case Work(start, nrOfElements)
reply(Result(calculatePiFor(start, nrOfElements))) // perform the work
channel ! Result(calculatePiFor(start, nrOfElements)) // perform the work
}
}

View file

@ -80,7 +80,7 @@ public class Pi {
public void onReceive(Object message) {
if (message instanceof Work) {
Work work = (Work) message;
reply(new Result(calculatePiFor(work.getArg(), work.getNrOfElements()))); // perform the work
getChannel().tell(new Result(calculatePiFor(work.getArg(), work.getNrOfElements()))); // perform the work
} else throw new IllegalArgumentException("Unknown message [" + message + "]");
}
}

View file

@ -41,7 +41,7 @@ object Pi extends App {
def receive = {
case Work(arg, nrOfElements)
reply(Result(calculatePiFor(arg, nrOfElements))) // perform the work
channel ! Result(calculatePiFor(arg, nrOfElements)) // perform the work
}
}