clean up Channel API (fixes #1070)
- remove feature query methods - encode information in marker traits - remove safe_! - rename tellSafe to tryTell (harmonize with ActorRef.reply)
This commit is contained in:
parent
04729bcbc3
commit
a43418a4c3
5 changed files with 62 additions and 87 deletions
|
|
@ -7,14 +7,33 @@ package akka.actor
|
||||||
import org.scalatest.WordSpec
|
import org.scalatest.WordSpec
|
||||||
import org.scalatest.matchers.MustMatchers
|
import org.scalatest.matchers.MustMatchers
|
||||||
import akka.dispatch._
|
import akka.dispatch._
|
||||||
|
import akka.testkit.TestActorRef
|
||||||
|
|
||||||
class ChannelSpec extends WordSpec with MustMatchers {
|
class ChannelSpec extends WordSpec with MustMatchers {
|
||||||
|
|
||||||
"A Channel" must {
|
"A Channel" must {
|
||||||
|
|
||||||
"be contravariant" in {
|
"be contravariant" in {
|
||||||
val ap = new ActorPromise(1000)
|
val ap = new ActorPromise(1000)
|
||||||
val p: Promise[Any] = ap
|
val p: Promise[Any] = ap
|
||||||
val c: Channel[Any] = ap
|
val c: Channel[Any] = ap
|
||||||
val cs: Channel[String] = c
|
val cs: Channel[String] = c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"find implicit sender actors" in {
|
||||||
|
var s: (String, UntypedChannel) = null
|
||||||
|
val ch = new Channel[String] {
|
||||||
|
def !(msg: String)(implicit sender: UntypedChannel) = { s = (msg, sender) }
|
||||||
}
|
}
|
||||||
|
val a = TestActorRef(new Actor {
|
||||||
|
def receive = {
|
||||||
|
case str: String ⇒ ch ! str
|
||||||
|
}
|
||||||
|
}).start()
|
||||||
|
a ! "hallo"
|
||||||
|
s must be(("hallo", a))
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -71,7 +71,7 @@ private[akka] object ActorRefInternals {
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
abstract class ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Comparable[ActorRef] with Serializable {
|
abstract class ActorRef extends ActorRefShared with ForwardableChannel with ReplyChannel[Any] with java.lang.Comparable[ActorRef] with Serializable {
|
||||||
scalaRef: ScalaActorRef ⇒
|
scalaRef: ScalaActorRef ⇒
|
||||||
// Only mutable for RemoteServer in order to maintain identity across nodes
|
// Only mutable for RemoteServer in order to maintain identity across nodes
|
||||||
@volatile
|
@volatile
|
||||||
|
|
@ -303,7 +303,7 @@ abstract class ActorRef extends ActorRefShared with ForwardableChannel with java
|
||||||
*
|
*
|
||||||
* If you would rather have an exception, check the <code>reply(..)</code> version.
|
* If you would rather have an exception, check the <code>reply(..)</code> version.
|
||||||
*/
|
*/
|
||||||
def tryReply(message: Any): Boolean = channel.safe_!(message)(this)
|
def tryReply(message: Any): Boolean = channel.tryTell(message, this)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
|
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
|
||||||
|
|
@ -386,20 +386,6 @@ abstract class ActorRef extends ActorRefShared with ForwardableChannel with java
|
||||||
else NullChannel
|
else NullChannel
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* Implementation of ForwardableChannel
|
|
||||||
*/
|
|
||||||
|
|
||||||
def sendException(ex: Throwable) {}
|
|
||||||
|
|
||||||
def isUsableOnlyOnce = false
|
|
||||||
|
|
||||||
def isUsable = true
|
|
||||||
|
|
||||||
def isReplyable = true
|
|
||||||
|
|
||||||
def canSendException = false
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API. <p/>
|
* Java API. <p/>
|
||||||
* Abstraction for unification of sender and senderFuture for later reply
|
* Abstraction for unification of sender and senderFuture for later reply
|
||||||
|
|
@ -1129,7 +1115,7 @@ trait ActorRefShared {
|
||||||
* There are implicit conversions in ../actor/Implicits.scala
|
* There are implicit conversions in ../actor/Implicits.scala
|
||||||
* from ActorRef -> ScalaActorRef and back
|
* from ActorRef -> ScalaActorRef and back
|
||||||
*/
|
*/
|
||||||
trait ScalaActorRef extends ActorRefShared with ForwardableChannel {
|
trait ScalaActorRef extends ActorRefShared with ForwardableChannel with ReplyChannel[Any] {
|
||||||
ref: ActorRef ⇒
|
ref: ActorRef ⇒
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -18,45 +18,17 @@ trait Channel[-T] {
|
||||||
* Scala API. <p/>
|
* Scala API. <p/>
|
||||||
* Sends the specified message to the channel.
|
* Sends the specified message to the channel.
|
||||||
*/
|
*/
|
||||||
def !(msg: T)(implicit channel: UntypedChannel = NullChannel): Unit
|
def !(msg: T)(implicit channel: UntypedChannel): Unit
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Try to send an exception. Not all channel types support this, one notable
|
* Try to send an exception. Not all channel types support this, one notable
|
||||||
* positive example is Future. Failure to send is silent.
|
* positive example is Future. Failure to send is silent.
|
||||||
|
*
|
||||||
|
* @return whether sending was successful
|
||||||
*/
|
*/
|
||||||
def sendException(ex: Throwable): Unit
|
def sendException(ex: Throwable): Boolean = false
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scala API.<p/>
|
|
||||||
* Try to send message to the channel, return whether successful.
|
|
||||||
*/
|
|
||||||
def safe_!(msg: T)(implicit channel: UntypedChannel = NullChannel): Boolean
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Indicates whether this channel may be used only once, e.g. a Future.
|
|
||||||
*/
|
|
||||||
def isUsableOnlyOnce: Boolean
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Indicates whether this channel may still be used (only useful if
|
|
||||||
* isUsableOnlyOnce returns true).
|
|
||||||
*/
|
|
||||||
def isUsable: Boolean
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Indicates whether this channel carries reply information, e.g. an
|
|
||||||
* ActorRef.
|
|
||||||
*/
|
|
||||||
def isReplyable: Boolean
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Indicates whether this channel is capable of sending exceptions to its
|
|
||||||
* recipient.
|
|
||||||
*/
|
|
||||||
def canSendException: Boolean
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Java API.<p/>
|
|
||||||
* Sends the specified message to the channel, i.e. fire-and-forget semantics.<p/>
|
* Sends the specified message to the channel, i.e. fire-and-forget semantics.<p/>
|
||||||
* <pre>
|
* <pre>
|
||||||
* actor.tell(message);
|
* actor.tell(message);
|
||||||
|
|
@ -76,13 +48,12 @@ trait Channel[-T] {
|
||||||
def tell(msg: T, sender: UntypedChannel): Unit = this.!(msg)(sender)
|
def tell(msg: T, sender: UntypedChannel): Unit = this.!(msg)(sender)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API.<p/>
|
|
||||||
* Try to send the specified message to the channel, i.e. fire-and-forget semantics.<p/>
|
* Try to send the specified message to the channel, i.e. fire-and-forget semantics.<p/>
|
||||||
* <pre>
|
* <pre>
|
||||||
* actor.tell(message);
|
* channel.tell(message);
|
||||||
* </pre>
|
* </pre>
|
||||||
*/
|
*/
|
||||||
def tellSafe(msg: T): Boolean = this.safe_!(msg)
|
def tryTell(msg: T): Boolean = this.tryTell(msg, NullChannel)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API. <p/>
|
* Java API. <p/>
|
||||||
|
|
@ -93,27 +64,33 @@ trait Channel[-T] {
|
||||||
* actor.tell(message, context);
|
* actor.tell(message, context);
|
||||||
* </pre>
|
* </pre>
|
||||||
*/
|
*/
|
||||||
def tellSafe(msg: T, sender: UntypedChannel): Boolean = this.safe_!(msg)(sender)
|
def tryTell(msg: T, sender: UntypedChannel): Boolean = {
|
||||||
|
try {
|
||||||
|
this.!(msg)(sender)
|
||||||
|
true
|
||||||
|
} catch {
|
||||||
|
case _: Exception ⇒ false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This trait represents a channel that a priori does have sending capability,
|
* This trait marks a channel that a priori does have sending capability,
|
||||||
* i.e. ! is not guaranteed to fail (e.g. NullChannel would be a
|
* i.e. ! is not guaranteed to fail (e.g. NullChannel would be a
|
||||||
* counter-example).
|
* counter-example).
|
||||||
*/
|
*/
|
||||||
trait AvailableChannel[-T] { self: Channel[T] ⇒
|
trait AvailableChannel[-T] extends Channel[T]
|
||||||
def safe_!(msg: T)(implicit channel: UntypedChannel = NullChannel): Boolean = {
|
|
||||||
if (isUsable) {
|
/**
|
||||||
try {
|
* This trait marks a channel which is capable of sending exceptions.
|
||||||
this ! msg
|
*/
|
||||||
true
|
trait ExceptionChannel[-T] extends AvailableChannel[T]
|
||||||
} catch {
|
|
||||||
case _ ⇒ false
|
/**
|
||||||
}
|
* This trait marks a channel which carries reply information when tell()ing.
|
||||||
} else false
|
*/
|
||||||
}
|
trait ReplyChannel[-T] extends AvailableChannel[T]
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* All channels used in conjunction with MessageInvocation are untyped by
|
* All channels used in conjunction with MessageInvocation are untyped by
|
||||||
|
|
@ -129,14 +106,13 @@ object UntypedChannel {
|
||||||
}
|
}
|
||||||
|
|
||||||
implicit final val default: UntypedChannel = NullChannel
|
implicit final val default: UntypedChannel = NullChannel
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default channel when none available.
|
* Default channel when none available.
|
||||||
*/
|
*/
|
||||||
case object NullChannel extends UntypedChannel {
|
case object NullChannel extends UntypedChannel {
|
||||||
def !(msg: Any)(implicit channel: UntypedChannel = NullChannel) {
|
def !(msg: Any)(implicit channel: UntypedChannel) {
|
||||||
throw new IllegalActorStateException("""
|
throw new IllegalActorStateException("""
|
||||||
No sender in scope, can't reply.
|
No sender in scope, can't reply.
|
||||||
You have probably:
|
You have probably:
|
||||||
|
|
@ -144,12 +120,7 @@ case object NullChannel extends UntypedChannel {
|
||||||
2. Invoked a method on an TypedActor from an instance NOT an TypedActor.
|
2. Invoked a method on an TypedActor from an instance NOT an TypedActor.
|
||||||
You may want to have a look at safe_! for a variant returning a Boolean""")
|
You may want to have a look at safe_! for a variant returning a Boolean""")
|
||||||
}
|
}
|
||||||
def safe_!(msg: Any)(implicit channel: UntypedChannel = NullChannel): Boolean = false
|
def tryTell(msg: Any)(implicit channel: UntypedChannel, dummy: Int = 0): Boolean = false
|
||||||
def sendException(ex: Throwable) {}
|
|
||||||
def isUsableOnlyOnce = false
|
|
||||||
def isUsable = false
|
|
||||||
def isReplyable = false
|
|
||||||
def canSendException = false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ package akka.dispatch
|
||||||
|
|
||||||
import akka.AkkaException
|
import akka.AkkaException
|
||||||
import akka.event.EventHandler
|
import akka.event.EventHandler
|
||||||
import akka.actor.{ Actor, Channel, ForwardableChannel, NullChannel, UntypedChannel, ActorRef, Scheduler, Timeout }
|
import akka.actor.{ Actor, Channel, ForwardableChannel, NullChannel, UntypedChannel, ActorRef, Scheduler, Timeout, ExceptionChannel }
|
||||||
import akka.util.{ Duration, BoxedType }
|
import akka.util.{ Duration, BoxedType }
|
||||||
import akka.japi.{ Procedure, Function ⇒ JFunc }
|
import akka.japi.{ Procedure, Function ⇒ JFunc }
|
||||||
|
|
||||||
|
|
@ -1012,21 +1012,17 @@ class DefaultPromise[T](val timeout: Timeout) extends Promise[T] {
|
||||||
private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
|
private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
|
||||||
}
|
}
|
||||||
|
|
||||||
class ActorPromise(timeout: Timeout) extends DefaultPromise[Any](timeout) with ForwardableChannel {
|
class ActorPromise(timeout: Timeout) extends DefaultPromise[Any](timeout) with ForwardableChannel with ExceptionChannel[Any] {
|
||||||
|
|
||||||
def !(message: Any)(implicit channel: UntypedChannel = NullChannel) = completeWithResult(message)
|
def !(message: Any)(implicit channel: UntypedChannel) = completeWithResult(message)
|
||||||
|
|
||||||
def sendException(ex: Throwable) = completeWithException(ex)
|
override def sendException(ex: Throwable) = {
|
||||||
|
completeWithException(ex)
|
||||||
|
value == Some(Left(ex))
|
||||||
|
}
|
||||||
|
|
||||||
def channel: UntypedChannel = this
|
def channel: UntypedChannel = this
|
||||||
|
|
||||||
def isUsableOnlyOnce = true
|
|
||||||
def isUsable = !isCompleted
|
|
||||||
def isReplyable = false
|
|
||||||
def canSendException = true
|
|
||||||
|
|
||||||
@deprecated("ActorPromise merged with Channel[Any], just use 'this'", "1.2")
|
|
||||||
def future = this
|
|
||||||
}
|
}
|
||||||
|
|
||||||
object ActorPromise {
|
object ActorPromise {
|
||||||
|
|
@ -1034,7 +1030,10 @@ object ActorPromise {
|
||||||
new ActorPromise(f.timeout) {
|
new ActorPromise(f.timeout) {
|
||||||
completeWith(f)
|
completeWith(f)
|
||||||
override def !(message: Any)(implicit channel: UntypedChannel) = f completeWithResult message
|
override def !(message: Any)(implicit channel: UntypedChannel) = f completeWithResult message
|
||||||
override def sendException(ex: Throwable) = f completeWithException ex
|
override def sendException(ex: Throwable) = {
|
||||||
|
f completeWithException ex
|
||||||
|
f.value == Some(Left(ex))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -240,7 +240,7 @@ which you do by Channel.tell(msg)
|
||||||
String msg = (String)message;
|
String msg = (String)message;
|
||||||
if (msg.equals("Hello")) {
|
if (msg.equals("Hello")) {
|
||||||
// Reply to original sender of message using the channel
|
// Reply to original sender of message using the channel
|
||||||
getContext().channel().tellSafe(msg + " from " + getContext().getUuid());
|
getContext().channel().tryTell(msg + " from " + getContext().getUuid());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue