diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ChannelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ChannelSpec.scala
index 291cf0d40f..8ed25e2f9a 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/actor/ChannelSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ChannelSpec.scala
@@ -7,14 +7,33 @@ package akka.actor
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.dispatch._
+import akka.testkit.TestActorRef
class ChannelSpec extends WordSpec with MustMatchers {
+
"A Channel" must {
+
"be contravariant" in {
val ap = new ActorPromise(1000)
val p: Promise[Any] = ap
val c: Channel[Any] = ap
val cs: Channel[String] = c
}
+
+ "find implicit sender actors" in {
+ var s: (String, UntypedChannel) = null
+ val ch = new Channel[String] {
+ def !(msg: String)(implicit sender: UntypedChannel) = { s = (msg, sender) }
+ }
+ val a = TestActorRef(new Actor {
+ def receive = {
+ case str: String ⇒ ch ! str
+ }
+ }).start()
+ a ! "hallo"
+ s must be(("hallo", a))
+ }
+
}
+
}
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index 6f0d1ad2cf..56094892d2 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -71,7 +71,7 @@ private[akka] object ActorRefInternals {
*
* @author Jonas Bonér
*/
-abstract class ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Comparable[ActorRef] with Serializable {
+abstract class ActorRef extends ActorRefShared with ForwardableChannel with ReplyChannel[Any] with java.lang.Comparable[ActorRef] with Serializable {
scalaRef: ScalaActorRef ⇒
// Only mutable for RemoteServer in order to maintain identity across nodes
@volatile
@@ -303,7 +303,7 @@ abstract class ActorRef extends ActorRefShared with ForwardableChannel with java
*
* If you would rather have an exception, check the reply(..) version.
*/
- def tryReply(message: Any): Boolean = channel.safe_!(message)(this)
+ def tryReply(message: Any): Boolean = channel.tryTell(message, this)
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
@@ -386,20 +386,6 @@ abstract class ActorRef extends ActorRefShared with ForwardableChannel with java
else NullChannel
}
- /*
- * Implementation of ForwardableChannel
- */
-
- def sendException(ex: Throwable) {}
-
- def isUsableOnlyOnce = false
-
- def isUsable = true
-
- def isReplyable = true
-
- def canSendException = false
-
/**
* Java API.
* actor.tell(message);
@@ -76,13 +48,12 @@ trait Channel[-T] {
def tell(msg: T, sender: UntypedChannel): Unit = this.!(msg)(sender)
/**
- * Java API.
* Try to send the specified message to the channel, i.e. fire-and-forget semantics.
*
- * actor.tell(message);
+ * channel.tell(message);
*
*/
- def tellSafe(msg: T): Boolean = this.safe_!(msg)
+ def tryTell(msg: T): Boolean = this.tryTell(msg, NullChannel)
/**
* Java API.
@@ -93,27 +64,33 @@ trait Channel[-T] {
* actor.tell(message, context);
*
*/
- def tellSafe(msg: T, sender: UntypedChannel): Boolean = this.safe_!(msg)(sender)
+ def tryTell(msg: T, sender: UntypedChannel): Boolean = {
+ try {
+ this.!(msg)(sender)
+ true
+ } catch {
+ case _: Exception ⇒ false
+ }
+ }
}
/**
- * This trait represents a channel that a priori does have sending capability,
+ * This trait marks a channel that a priori does have sending capability,
* i.e. ! is not guaranteed to fail (e.g. NullChannel would be a
* counter-example).
*/
-trait AvailableChannel[-T] { self: Channel[T] ⇒
- def safe_!(msg: T)(implicit channel: UntypedChannel = NullChannel): Boolean = {
- if (isUsable) {
- try {
- this ! msg
- true
- } catch {
- case _ ⇒ false
- }
- } else false
- }
-}
+trait AvailableChannel[-T] extends Channel[T]
+
+/**
+ * This trait marks a channel which is capable of sending exceptions.
+ */
+trait ExceptionChannel[-T] extends AvailableChannel[T]
+
+/**
+ * This trait marks a channel which carries reply information when tell()ing.
+ */
+trait ReplyChannel[-T] extends AvailableChannel[T]
/**
* All channels used in conjunction with MessageInvocation are untyped by
@@ -129,14 +106,13 @@ object UntypedChannel {
}
implicit final val default: UntypedChannel = NullChannel
-
}
/**
* Default channel when none available.
*/
case object NullChannel extends UntypedChannel {
- def !(msg: Any)(implicit channel: UntypedChannel = NullChannel) {
+ def !(msg: Any)(implicit channel: UntypedChannel) {
throw new IllegalActorStateException("""
No sender in scope, can't reply.
You have probably:
@@ -144,12 +120,7 @@ case object NullChannel extends UntypedChannel {
2. Invoked a method on an TypedActor from an instance NOT an TypedActor.
You may want to have a look at safe_! for a variant returning a Boolean""")
}
- def safe_!(msg: Any)(implicit channel: UntypedChannel = NullChannel): Boolean = false
- def sendException(ex: Throwable) {}
- def isUsableOnlyOnce = false
- def isUsable = false
- def isReplyable = false
- def canSendException = false
+ def tryTell(msg: Any)(implicit channel: UntypedChannel, dummy: Int = 0): Boolean = false
}
/**
diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala
index a36c21ee57..1f7aa96f68 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Future.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala
@@ -6,7 +6,7 @@ package akka.dispatch
import akka.AkkaException
import akka.event.EventHandler
-import akka.actor.{ Actor, Channel, ForwardableChannel, NullChannel, UntypedChannel, ActorRef, Scheduler, Timeout }
+import akka.actor.{ Actor, Channel, ForwardableChannel, NullChannel, UntypedChannel, ActorRef, Scheduler, Timeout, ExceptionChannel }
import akka.util.{ Duration, BoxedType }
import akka.japi.{ Procedure, Function ⇒ JFunc }
@@ -1012,21 +1012,17 @@ class DefaultPromise[T](val timeout: Timeout) extends Promise[T] {
private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
}
-class ActorPromise(timeout: Timeout) extends DefaultPromise[Any](timeout) with ForwardableChannel {
+class ActorPromise(timeout: Timeout) extends DefaultPromise[Any](timeout) with ForwardableChannel with ExceptionChannel[Any] {
- def !(message: Any)(implicit channel: UntypedChannel = NullChannel) = completeWithResult(message)
+ def !(message: Any)(implicit channel: UntypedChannel) = completeWithResult(message)
- def sendException(ex: Throwable) = completeWithException(ex)
+ override def sendException(ex: Throwable) = {
+ completeWithException(ex)
+ value == Some(Left(ex))
+ }
def channel: UntypedChannel = this
- def isUsableOnlyOnce = true
- def isUsable = !isCompleted
- def isReplyable = false
- def canSendException = true
-
- @deprecated("ActorPromise merged with Channel[Any], just use 'this'", "1.2")
- def future = this
}
object ActorPromise {
@@ -1034,7 +1030,10 @@ object ActorPromise {
new ActorPromise(f.timeout) {
completeWith(f)
override def !(message: Any)(implicit channel: UntypedChannel) = f completeWithResult message
- override def sendException(ex: Throwable) = f completeWithException ex
+ override def sendException(ex: Throwable) = {
+ f completeWithException ex
+ f.value == Some(Left(ex))
+ }
}
}
diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst
index 5fbe3c7a81..5e3ab07a81 100644
--- a/akka-docs/java/untyped-actors.rst
+++ b/akka-docs/java/untyped-actors.rst
@@ -240,7 +240,7 @@ which you do by Channel.tell(msg)
String msg = (String)message;
if (msg.equals("Hello")) {
// Reply to original sender of message using the channel
- getContext().channel().tellSafe(msg + " from " + getContext().getUuid());
+ getContext().channel().tryTell(msg + " from " + getContext().getUuid());
}
}
}