Ticket 964: rename of reply?

This commit is contained in:
Peter Veentjer 2011-07-17 09:02:36 +03:00
parent e7b33d46c2
commit 7983a66f68
14 changed files with 44 additions and 36 deletions

View file

@ -305,8 +305,8 @@ class ActorRefSpec extends WordSpec with MustMatchers {
val ref = Actor.actorOf(
new Actor {
def receive = {
case 5 self reply_? "five"
case null self reply_? "null"
case 5 self tryReply "five"
case null self tryReply "null"
}
}).start()

View file

@ -41,7 +41,7 @@ object SupervisorSpec {
def receive = {
case Ping
messageLog.put(PingMessage)
self.reply_?(PongMessage)
self.tryReply(PongMessage)
case Die
throw new RuntimeException(ExceptionMessage)
}
@ -361,7 +361,7 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
if (inits.get % 2 == 0) throw new IllegalStateException("Don't wanna!")
def receive = {
case Ping self.reply_?(PongMessage)
case Ping self.tryReply(PongMessage)
case Die throw new Exception("expected")
}
})

View file

@ -63,11 +63,11 @@ object Ticket669Spec {
}
override def preRestart(reason: scala.Throwable) {
self.reply_?("failure1")
self.tryReply("failure1")
}
override def postStop() {
self.reply_?("failure2")
self.tryReply("failure2")
}
}
}

View file

@ -20,7 +20,7 @@ object ActorModelSpec {
sealed trait ActorModelMessage
case class Reply_?(expect: Any) extends ActorModelMessage
case class TryReply(expect: Any) extends ActorModelMessage
case class Reply(expect: Any) extends ActorModelMessage
@ -73,7 +73,7 @@ object ActorModelSpec {
case Wait(time) ack; Thread.sleep(time); busy.switchOff()
case WaitAck(time, l) ack; Thread.sleep(time); l.countDown(); busy.switchOff()
case Reply(msg) ack; self.reply(msg); busy.switchOff()
case Reply_?(msg) ack; self.reply_?(msg); busy.switchOff()
case TryReply(msg) ack; self.tryReply(msg); busy.switchOff()
case Forward(to, msg) ack; to.forward(msg); busy.switchOff()
case CountDown(latch) ack; latch.countDown(); busy.switchOff()
case Increment(count) ack; count.incrementAndGet(); busy.switchOff()

View file

@ -176,7 +176,7 @@ class FutureSpec extends JUnitSuite {
def shouldFoldResults {
val actors = (1 to 10).toList map { _
actorOf(new Actor {
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); self reply_? add }
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); self tryReply add }
}).start()
}
val timeout = 10000
@ -204,7 +204,7 @@ class FutureSpec extends JUnitSuite {
def shouldFoldResultsByComposing {
val actors = (1 to 10).toList map { _
actorOf(new Actor {
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); self reply_? add }
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); self tryReply add }
}).start()
}
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200), 10000).mapTo[Int] }
@ -219,7 +219,7 @@ class FutureSpec extends JUnitSuite {
case (add: Int, wait: Int)
Thread.sleep(wait)
if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
self reply_? add
self tryReply add
}
}).start()
}
@ -237,7 +237,7 @@ class FutureSpec extends JUnitSuite {
def shouldReduceResults {
val actors = (1 to 10).toList map { _
actorOf(new Actor {
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); self reply_? add }
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); self tryReply add }
}).start()
}
val timeout = 10000
@ -253,7 +253,7 @@ class FutureSpec extends JUnitSuite {
case (add: Int, wait: Int)
Thread.sleep(wait)
if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
self reply_? add
self tryReply add
}
}).start()
}

View file

@ -33,7 +33,7 @@ class PriorityDispatcherSpec extends WordSpec with MustMatchers {
def receive = {
case i: Int acc = i :: acc
case 'Result self reply_? acc
case 'Result self tryReply acc
}
}).start()

View file

@ -192,7 +192,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
case _
count.incrementAndGet
latch.countDown()
self reply_? "success"
self tryReply "success"
}
}).start()
@ -241,7 +241,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
def receive = {
case req: String {
sleepFor(10 millis)
self.reply_?("Response")
self.tryReply("Response")
}
}
})

View file

@ -24,7 +24,7 @@ class Ticket703Spec extends WordSpec with MustMatchers {
def receive = {
case req: String
Thread.sleep(6000L)
self.reply_?("Response")
self.tryReply("Response")
}
})
}).start()

View file

@ -12,8 +12,6 @@ import akka.util._
import akka.serialization.{Serializer, Serialization}
import ReflectiveAccess._
import ClusterModule._
import DeploymentConfig.{TransactionLog TransactionLogConfig, _}
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ScheduledFuture, ConcurrentHashMap, TimeUnit}
@ -23,6 +21,7 @@ import scala.reflect.BeanProperty
import scala.collection.immutable.Stack
import scala.annotation.tailrec
import java.lang.IllegalStateException
import akka.actor.DeploymentConfig.ReplicationScheme
private[akka] object ActorRefInternals {
@ -280,6 +279,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
* <p/>
* Throws an IllegalStateException if unable to determine what to reply to.
*/
@deprecated("will be removed in 2.0, use reply instead", "1.2")
def replyUnsafe(message: AnyRef) {
reply(message)
}
@ -291,7 +291,8 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
* <p/>
* Returns true if reply was sent, and false if unable to determine what to reply to.
*/
def replySafe(message: AnyRef): Boolean = reply_?(message)
@deprecated("will be removed in 2.0, use tryReply instead", "1.2")
def replySafe(message: AnyRef): Boolean = tryReply(message)
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
@ -473,7 +474,7 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor,
Serialization.serializerFor(this.getClass).fold(x serializerErrorDueTo(x.toString), s s)
private lazy val replicationScheme: ReplicationScheme =
DeploymentConfig.replicationSchemeFor(Deployer.deploymentFor(address)).getOrElse(Transient)
DeploymentConfig.replicationSchemeFor(Deployer.deploymentFor(address)).getOrElse(DeploymentConfig.Transient)
private lazy val isReplicated: Boolean = DeploymentConfig.isReplicated(replicationScheme)
@ -1230,19 +1231,26 @@ trait ScalaActorRef extends ActorRefShared with ForwardableChannel {
/**
* Use <code>self.reply(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* being processed. This method fails if the original sender of the message could not be determined with an
* IllegalStateException.
*
* If you don't want deal with this IllegalStateException, but just a boolean, just use the <code>tryReply(...)</code>
* version.
*
* <p/>
* Throws an IllegalStateException if unable to determine what to reply to.
*/
def reply(message: Any) = channel.!(message)(this)
/**
* Use <code>reply_?(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* Use <code>tryReply(..)</code> to try reply with a message to the original sender of the message currently
* being processed. This method
* <p/>
* Returns true if reply was sent, and false if unable to determine what to reply to.
*
* If you would rather have an exception, check the <code>reply(..)</code> version.
*/
def reply_?(message: Any): Boolean = channel.safe_!(message)(this)
def tryReply(message: Any): Boolean = channel.safe_!(message)(this)
}
case class SerializedActorRef(uuid: Uuid,

View file

@ -123,7 +123,7 @@ trait DefaultActorPool extends ActorPool { this: Actor with ActorPoolSupervision
protected def _route(): Receive = {
// for testing...
case Stat
self reply_? Stats(_delegates length)
self tryReply Stats(_delegates length)
case MaximumNumberOfRestartsWithinTimeRangeReached(victim, _, _, _)
_delegates = _delegates filterNot { _.uuid == victim.uuid }
case Death(victim, _) =>

View file

@ -252,11 +252,11 @@ object ConsumerScalaTest {
}
override def preRestart(reason: scala.Throwable) {
self.reply_?("pr")
self.tryReply("pr")
}
override def postStop {
self.reply_?("ps")
self.tryReply("ps")
}
}

View file

@ -335,13 +335,13 @@ If you want to send a message back to the original sender of the message you jus
In this case the ``result`` will be send back to the Actor that sent the ``request``.
The ``reply`` method throws an ``IllegalStateException`` if unable to determine what to reply to, e.g. the sender is not an actor. You can also use the more forgiving ``reply_?`` method which returns ``true`` if reply was sent, and ``false`` if unable to determine what to reply to.
The ``reply`` method throws an ``IllegalStateException`` if unable to determine what to reply to, e.g. the sender is not an actor. You can also use the more forgiving ``tryReply`` method which returns ``true`` if reply was sent, and ``false`` if unable to determine what to reply to.
.. code-block:: scala
case request =>
val result = process(request)
if (self.reply_?(result)) ...// success
if (self.tryReply(result)) ...// success
else ... // handle failure
Summary of reply semantics and options

View file

@ -322,16 +322,16 @@ Supervised actors have the option to reply to the initial sender within preResta
}
override def preRestart(reason: scala.Throwable) {
self.reply_?(reason.getMessage)
self.tryReply(reason.getMessage)
}
override def postStop() {
self.reply_?("stopped by supervisor")
self.tryReply("stopped by supervisor")
}
}
- A reply within preRestart or postRestart must be a safe reply via `self.reply_?` because an unsafe self.reply will throw an exception when the actor is restarted without having failed. This can be the case in context of AllForOne restart strategies.
- A reply within postStop must be a safe reply via `self.reply_?` because an unsafe self.reply will throw an exception when the actor has been stopped by the application (and not by a supervisor) after successful execution of receive (or no execution at all).
- A reply within preRestart or postRestart must be a safe reply via `self.tryReply` because an unsafe self.reply will throw an exception when the actor is restarted without having failed. This can be the case in context of AllForOne restart strategies.
- A reply within postStop must be a safe reply via `self.tryReply` because an unsafe self.reply will throw an exception when the actor has been stopped by the application (and not by a supervisor) after successful execution of receive (or no execution at all).
Handling too many actor restarts within a specific time limit
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

View file

@ -283,7 +283,7 @@ class AgentUpdater[T](agent: Agent[T]) extends Actor {
def receive = {
case update: Update[T]
self.reply_?(atomic(txFactory) { agent.ref alter update.function })
self.tryReply(atomic(txFactory) { agent.ref alter update.function })
case Get self reply agent.get
case _ ()
}
@ -299,7 +299,7 @@ class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor {
def receive = {
case update: Update[T] try {
self.reply_?(atomic(txFactory) { agent.ref alter update.function })
self.tryReply(atomic(txFactory) { agent.ref alter update.function })
} finally {
agent.resume
self.stop()