From 7983a66f681468432171d6c4391682c2840ba02c Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Sun, 17 Jul 2011 09:02:36 +0300 Subject: [PATCH] Ticket 964: rename of reply? --- .../scala/akka/actor/actor/ActorRefSpec.scala | 4 ++-- .../actor/supervisor/SupervisorSpec.scala | 4 ++-- .../akka/actor/supervisor/Ticket669Spec.scala | 4 ++-- .../scala/akka/dispatch/ActorModelSpec.scala | 4 ++-- .../test/scala/akka/dispatch/FutureSpec.scala | 10 ++++---- .../dispatch/PriorityDispatcherSpec.scala | 2 +- .../test/scala/akka/routing/RoutingSpec.scala | 4 ++-- .../scala/akka/ticket/Ticket703Spec.scala | 2 +- .../src/main/scala/akka/actor/ActorRef.scala | 24 ++++++++++++------- .../src/main/scala/akka/routing/Pool.scala | 2 +- .../scala/akka/camel/ConsumerScalaTest.scala | 4 ++-- akka-docs/scala/actors.rst | 4 ++-- akka-docs/scala/fault-tolerance.rst | 8 +++---- .../src/main/scala/akka/agent/Agent.scala | 4 ++-- 14 files changed, 44 insertions(+), 36 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala index 28c68b15eb..95f5f29937 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala @@ -305,8 +305,8 @@ class ActorRefSpec extends WordSpec with MustMatchers { val ref = Actor.actorOf( new Actor { def receive = { - case 5 ⇒ self reply_? "five" - case null ⇒ self reply_? "null" + case 5 ⇒ self tryReply "five" + case null ⇒ self tryReply "null" } }).start() diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala index d82905b8cf..46573f7799 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala @@ -41,7 +41,7 @@ object SupervisorSpec { def receive = { case Ping ⇒ messageLog.put(PingMessage) - self.reply_?(PongMessage) + self.tryReply(PongMessage) case Die ⇒ throw new RuntimeException(ExceptionMessage) } @@ -361,7 +361,7 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach if (inits.get % 2 == 0) throw new IllegalStateException("Don't wanna!") def receive = { - case Ping ⇒ self.reply_?(PongMessage) + case Ping ⇒ self.tryReply(PongMessage) case Die ⇒ throw new Exception("expected") } }) diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala index bddad26176..0976063b3b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala @@ -63,11 +63,11 @@ object Ticket669Spec { } override def preRestart(reason: scala.Throwable) { - self.reply_?("failure1") + self.tryReply("failure1") } override def postStop() { - self.reply_?("failure2") + self.tryReply("failure2") } } } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala index 62297ca495..327d6dda10 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -20,7 +20,7 @@ object ActorModelSpec { sealed trait ActorModelMessage - case class Reply_?(expect: Any) extends ActorModelMessage + case class TryReply(expect: Any) extends ActorModelMessage case class Reply(expect: Any) extends ActorModelMessage @@ -73,7 +73,7 @@ object ActorModelSpec { case Wait(time) ⇒ ack; Thread.sleep(time); busy.switchOff() case WaitAck(time, l) ⇒ ack; Thread.sleep(time); l.countDown(); busy.switchOff() case Reply(msg) ⇒ ack; self.reply(msg); busy.switchOff() - case Reply_?(msg) ⇒ ack; self.reply_?(msg); busy.switchOff() + case TryReply(msg) ⇒ ack; self.tryReply(msg); busy.switchOff() case Forward(to, msg) ⇒ ack; to.forward(msg); busy.switchOff() case CountDown(latch) ⇒ ack; latch.countDown(); busy.switchOff() case Increment(count) ⇒ ack; count.incrementAndGet(); busy.switchOff() diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index b0766121bb..238a1cc612 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -176,7 +176,7 @@ class FutureSpec extends JUnitSuite { def shouldFoldResults { val actors = (1 to 10).toList map { _ ⇒ actorOf(new Actor { - def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); self reply_? add } + def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); self tryReply add } }).start() } val timeout = 10000 @@ -204,7 +204,7 @@ class FutureSpec extends JUnitSuite { def shouldFoldResultsByComposing { val actors = (1 to 10).toList map { _ ⇒ actorOf(new Actor { - def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); self reply_? add } + def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); self tryReply add } }).start() } def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), 10000).mapTo[Int] } @@ -219,7 +219,7 @@ class FutureSpec extends JUnitSuite { case (add: Int, wait: Int) ⇒ Thread.sleep(wait) if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected") - self reply_? add + self tryReply add } }).start() } @@ -237,7 +237,7 @@ class FutureSpec extends JUnitSuite { def shouldReduceResults { val actors = (1 to 10).toList map { _ ⇒ actorOf(new Actor { - def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); self reply_? add } + def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); self tryReply add } }).start() } val timeout = 10000 @@ -253,7 +253,7 @@ class FutureSpec extends JUnitSuite { case (add: Int, wait: Int) ⇒ Thread.sleep(wait) if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected") - self reply_? add + self tryReply add } }).start() } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala index 5ddd9fa411..c2fbc2d09c 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala @@ -33,7 +33,7 @@ class PriorityDispatcherSpec extends WordSpec with MustMatchers { def receive = { case i: Int ⇒ acc = i :: acc - case 'Result ⇒ self reply_? acc + case 'Result ⇒ self tryReply acc } }).start() diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index b6d7777a95..84c7c3ea4f 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -192,7 +192,7 @@ class RoutingSpec extends WordSpec with MustMatchers { case _ ⇒ count.incrementAndGet latch.countDown() - self reply_? "success" + self tryReply "success" } }).start() @@ -241,7 +241,7 @@ class RoutingSpec extends WordSpec with MustMatchers { def receive = { case req: String ⇒ { sleepFor(10 millis) - self.reply_?("Response") + self.tryReply("Response") } } }) diff --git a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala index fd9bc4d1f7..849070752b 100644 --- a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala @@ -24,7 +24,7 @@ class Ticket703Spec extends WordSpec with MustMatchers { def receive = { case req: String ⇒ Thread.sleep(6000L) - self.reply_?("Response") + self.tryReply("Response") } }) }).start() diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 084ca12994..24fe2b373b 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -12,8 +12,6 @@ import akka.util._ import akka.serialization.{Serializer, Serialization} import ReflectiveAccess._ import ClusterModule._ -import DeploymentConfig.{TransactionLog ⇒ TransactionLogConfig, _} - import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ScheduledFuture, ConcurrentHashMap, TimeUnit} @@ -23,6 +21,7 @@ import scala.reflect.BeanProperty import scala.collection.immutable.Stack import scala.annotation.tailrec import java.lang.IllegalStateException +import akka.actor.DeploymentConfig.ReplicationScheme private[akka] object ActorRefInternals { @@ -280,6 +279,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com *

* Throws an IllegalStateException if unable to determine what to reply to. */ + @deprecated("will be removed in 2.0, use reply instead", "1.2") def replyUnsafe(message: AnyRef) { reply(message) } @@ -291,7 +291,8 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com *

* Returns true if reply was sent, and false if unable to determine what to reply to. */ - def replySafe(message: AnyRef): Boolean = reply_?(message) + @deprecated("will be removed in 2.0, use tryReply instead", "1.2") + def replySafe(message: AnyRef): Boolean = tryReply(message) /** * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. @@ -473,7 +474,7 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor, Serialization.serializerFor(this.getClass).fold(x ⇒ serializerErrorDueTo(x.toString), s ⇒ s) private lazy val replicationScheme: ReplicationScheme = - DeploymentConfig.replicationSchemeFor(Deployer.deploymentFor(address)).getOrElse(Transient) + DeploymentConfig.replicationSchemeFor(Deployer.deploymentFor(address)).getOrElse(DeploymentConfig.Transient) private lazy val isReplicated: Boolean = DeploymentConfig.isReplicated(replicationScheme) @@ -1230,19 +1231,26 @@ trait ScalaActorRef extends ActorRefShared with ForwardableChannel { /** * Use self.reply(..) to reply with a message to the original sender of the message currently - * being processed. + * being processed. This method fails if the original sender of the message could not be determined with an + * IllegalStateException. + * + * If you don't want deal with this IllegalStateException, but just a boolean, just use the tryReply(...) + * version. + * *

* Throws an IllegalStateException if unable to determine what to reply to. */ def reply(message: Any) = channel.!(message)(this) /** - * Use reply_?(..) to reply with a message to the original sender of the message currently - * being processed. + * Use tryReply(..) to try reply with a message to the original sender of the message currently + * being processed. This method *

* Returns true if reply was sent, and false if unable to determine what to reply to. + * + * If you would rather have an exception, check the reply(..) version. */ - def reply_?(message: Any): Boolean = channel.safe_!(message)(this) + def tryReply(message: Any): Boolean = channel.safe_!(message)(this) } case class SerializedActorRef(uuid: Uuid, diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index ef6db353a4..6398ef2241 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -123,7 +123,7 @@ trait DefaultActorPool extends ActorPool { this: Actor with ActorPoolSupervision protected def _route(): Receive = { // for testing... case Stat ⇒ - self reply_? Stats(_delegates length) + self tryReply Stats(_delegates length) case MaximumNumberOfRestartsWithinTimeRangeReached(victim, _, _, _) ⇒ _delegates = _delegates filterNot { _.uuid == victim.uuid } case Death(victim, _) => diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala index 61909b0db0..a6a6971d98 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala @@ -252,11 +252,11 @@ object ConsumerScalaTest { } override def preRestart(reason: scala.Throwable) { - self.reply_?("pr") + self.tryReply("pr") } override def postStop { - self.reply_?("ps") + self.tryReply("ps") } } diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 20ceda4285..96db03ac11 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -335,13 +335,13 @@ If you want to send a message back to the original sender of the message you jus In this case the ``result`` will be send back to the Actor that sent the ``request``. -The ``reply`` method throws an ``IllegalStateException`` if unable to determine what to reply to, e.g. the sender is not an actor. You can also use the more forgiving ``reply_?`` method which returns ``true`` if reply was sent, and ``false`` if unable to determine what to reply to. +The ``reply`` method throws an ``IllegalStateException`` if unable to determine what to reply to, e.g. the sender is not an actor. You can also use the more forgiving ``tryReply`` method which returns ``true`` if reply was sent, and ``false`` if unable to determine what to reply to. .. code-block:: scala case request => val result = process(request) - if (self.reply_?(result)) ...// success + if (self.tryReply(result)) ...// success else ... // handle failure Summary of reply semantics and options diff --git a/akka-docs/scala/fault-tolerance.rst b/akka-docs/scala/fault-tolerance.rst index c7ac83fd7e..b610bff96f 100644 --- a/akka-docs/scala/fault-tolerance.rst +++ b/akka-docs/scala/fault-tolerance.rst @@ -322,16 +322,16 @@ Supervised actors have the option to reply to the initial sender within preResta } override def preRestart(reason: scala.Throwable) { - self.reply_?(reason.getMessage) + self.tryReply(reason.getMessage) } override def postStop() { - self.reply_?("stopped by supervisor") + self.tryReply("stopped by supervisor") } } -- A reply within preRestart or postRestart must be a safe reply via `self.reply_?` because an unsafe self.reply will throw an exception when the actor is restarted without having failed. This can be the case in context of AllForOne restart strategies. -- A reply within postStop must be a safe reply via `self.reply_?` because an unsafe self.reply will throw an exception when the actor has been stopped by the application (and not by a supervisor) after successful execution of receive (or no execution at all). +- A reply within preRestart or postRestart must be a safe reply via `self.tryReply` because an unsafe self.reply will throw an exception when the actor is restarted without having failed. This can be the case in context of AllForOne restart strategies. +- A reply within postStop must be a safe reply via `self.tryReply` because an unsafe self.reply will throw an exception when the actor has been stopped by the application (and not by a supervisor) after successful execution of receive (or no execution at all). Handling too many actor restarts within a specific time limit ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/akka-stm/src/main/scala/akka/agent/Agent.scala b/akka-stm/src/main/scala/akka/agent/Agent.scala index fb525cc376..3a97263369 100644 --- a/akka-stm/src/main/scala/akka/agent/Agent.scala +++ b/akka-stm/src/main/scala/akka/agent/Agent.scala @@ -283,7 +283,7 @@ class AgentUpdater[T](agent: Agent[T]) extends Actor { def receive = { case update: Update[T] ⇒ - self.reply_?(atomic(txFactory) { agent.ref alter update.function }) + self.tryReply(atomic(txFactory) { agent.ref alter update.function }) case Get ⇒ self reply agent.get case _ ⇒ () } @@ -299,7 +299,7 @@ class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor { def receive = { case update: Update[T] ⇒ try { - self.reply_?(atomic(txFactory) { agent.ref alter update.function }) + self.tryReply(atomic(txFactory) { agent.ref alter update.function }) } finally { agent.resume self.stop()