avoid wrapping messages when ask returns single type

This commit is contained in:
Roland 2013-02-03 23:13:57 +01:00
parent 859589b9d4
commit 9b0c3a486f
5 changed files with 106 additions and 44 deletions

View file

@ -95,7 +95,7 @@ object ChannelSpec {
implicit val timeout = Timeout(1.second) implicit val timeout = Timeout(1.second)
channel[T] { (x, snd) channel[T] { (x, snd)
val xx: WrappedMessage[T, Any] = x val xx: WrappedMessage[T, Any] = x
val f: Future[WrappedMessage[(ReplyChannels[T], Nothing) :+: TNil, ReplyChannels[T]]] = target <-?- x val f: Future[ReplyChannels[T]] = target <-?- x
f -!-> snd f -!-> snd
} }
import language.existentials import language.existentials
@ -415,7 +415,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
"support typed ask" in { "support typed ask" in {
val t = ChannelExt(system).actorOf(new Tester, "t21") val t = ChannelExt(system).actorOf(new Tester, "t21")
implicit val timeout = Timeout(1.second) implicit val timeout = Timeout(1.second)
val r: Future[C] = (t <-?- A).lub val r: Future[C] = t <-?- A
Await.result(r, 1.second) must be(C) Await.result(r, 1.second) must be(C)
} }
@ -471,12 +471,22 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
lastSender must be === t.actorRef lastSender must be === t.actorRef
} }
"not wrap Futures unnecessarily" in {
val a = ChannelExt(system).actorOf(new Tester, "t26a")
implicit val timeout = Timeout(1.second)
val c1: C = Await.result(a <-?- A, 1.second)
val c2: C = Await.result(A -?-> a, 1.second)
val fA = Future successful A
val c3: C = Await.result(a <-?- fA, 1.second)
val c4: C = Await.result(fA -?-> a, 1.second)
}
"be able to transform Futures" in { "be able to transform Futures" in {
val client = new ChannelRef[(Any, Nothing) :+: TNil](testActor) val client = new ChannelRef[(Any, Nothing) :+: TNil](testActor)
val someActor = ChannelExt(system).actorOf(new Tester, "t26b") val someActor = ChannelExt(system).actorOf(new Tester, "t26b")
implicit val timeout = Timeout(1.second) implicit val timeout = Timeout(1.second)
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
A -?-> someActor -*-> (_ map (_.value match { case C B })) -?-> someActor -!-> client A -?-> someActor -*-> (_ map { case C B }) -?-> someActor -!-> client
expectMsg(D) expectMsg(D)
} }

View file

@ -32,7 +32,7 @@ class ChannelRef[+T <: ChannelList](val actorRef: ActorRef) extends AnyVal {
* which will be completed with the reply message or a TimeoutException. * which will be completed with the reply message or a TimeoutException.
* If the message is a Future itself, eventually send the Futures value. * If the message is a Future itself, eventually send the Futures value.
*/ */
def <-?-[M](msg: M): Future[_] = macro macros.Ask.impl[ChannelList, Any, T, M] def <-?-[M](msg: M): Future[_] = macro macros.Ask.impl[ChannelList, Any, Any, T, M]
/** /**
* Narrow this ChannelRef by removing channels or narrowing input types or * Narrow this ChannelRef by removing channels or narrowing input types or

View file

@ -21,11 +21,12 @@ object Ask {
def impl[ // def impl[ //
ReturnChannels <: ChannelList, // the precise type union describing the reply ReturnChannels <: ChannelList, // the precise type union describing the reply
ReturnLUB, // the least-upper bound for the reply types ReturnLUB, // the least-upper bound for the reply types
ReturnT, // the return type if it is just a single type
Channel <: ChannelList: c.WeakTypeTag, // the channel being asked Channel <: ChannelList: c.WeakTypeTag, // the channel being asked
Msg: c.WeakTypeTag // the message being sent down the channel Msg: c.WeakTypeTag // the message being sent down the channel
](c: Context { ](c: Context {
type PrefixType = ChannelRef[Channel] type PrefixType = ChannelRef[Channel]
})(msg: c.Expr[Msg]): c.Expr[Future[WrappedMessage[ReturnChannels, ReturnLUB]]] = { })(msg: c.Expr[Msg]): c.Expr[Future[_]] = {
import c.universe._ import c.universe._
val tpeChannel = weakTypeOf[Channel] val tpeChannel = weakTypeOf[Channel]
@ -41,19 +42,33 @@ object Ask {
Tell.verify(c)(null, unwrapped, typeOf[(Any, Nothing) :+: TNil], tpeChannel) Tell.verify(c)(null, unwrapped, typeOf[(Any, Nothing) :+: TNil], tpeChannel)
implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing])) implicit lazy val ttReturn = c.TypeTag[ReturnT](out.head)
implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out)) implicit lazy val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing]))
if (isFuture) implicit lazy val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out))
out match {
case x :: Nil if isFuture
if (unwrapped <:< typeOf[ChannelList])
reify(askFutureWrappedNoWrap[ReturnT](
c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice))
else
reify(askFutureNoWrap[ReturnT](
c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[Any]])(imp[Timeout](c).splice))
case x :: Nil
reify(askOpsNoWrap[ReturnT](
c.prefix.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice))
case _ if isFuture
if (unwrapped <:< typeOf[ChannelList]) if (unwrapped <:< typeOf[ChannelList])
reify(askFutureWrapped[WrappedMessage[ReturnChannels, ReturnLUB]]( reify(askFutureWrapped[WrappedMessage[ReturnChannels, ReturnLUB]](
c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice)) c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice))
else else
reify(askFuture[WrappedMessage[ReturnChannels, ReturnLUB]]( reify(askFuture[WrappedMessage[ReturnChannels, ReturnLUB]](
c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[Any]])(imp[Timeout](c).splice)) c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[Any]])(imp[Timeout](c).splice))
else case _
reify(askOps[WrappedMessage[ReturnChannels, ReturnLUB]]( reify(askOps[WrappedMessage[ReturnChannels, ReturnLUB]](
c.prefix.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice)) c.prefix.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice))
} }
}
def opsImpl[ // def opsImpl[ //
ReturnChannels <: ChannelList, // the precise type union describing the reply ReturnChannels <: ChannelList, // the precise type union describing the reply
@ -62,7 +77,7 @@ object Ask {
Msg: c.WeakTypeTag // the message being sent down the channel Msg: c.WeakTypeTag // the message being sent down the channel
](c: Context { ](c: Context {
type PrefixType = AnyOps[Msg] type PrefixType = AnyOps[Msg]
})(channel: c.Expr[ChannelRef[Channel]]): c.Expr[Future[WrappedMessage[ReturnChannels, ReturnLUB]]] = { })(channel: c.Expr[ChannelRef[Channel]]): c.Expr[Future[_]] = {
import c.universe._ import c.universe._
val tpeChannel = weakTypeOf[Channel] val tpeChannel = weakTypeOf[Channel]
@ -72,12 +87,19 @@ object Ask {
Tell.verify(c)(null, unwrapped, typeOf[(Any, Nothing) :+: TNil], tpeChannel) Tell.verify(c)(null, unwrapped, typeOf[(Any, Nothing) :+: TNil], tpeChannel)
val msg = reify(c.prefix.splice.value)
out match {
case x :: Nil
implicit val ttReturnLUB = c.TypeTag[ReturnLUB](x)
reify(askOpsNoWrap[ReturnLUB](
channel.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice))
case _
implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing])) implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing]))
implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out)) implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out))
val msg = reify(c.prefix.splice.value)
reify(askOps[WrappedMessage[ReturnChannels, ReturnLUB]]( reify(askOps[WrappedMessage[ReturnChannels, ReturnLUB]](
channel.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice)) channel.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice))
} }
}
// this is the implementation for Future[_] -?-> ChannelRef[_] // this is the implementation for Future[_] -?-> ChannelRef[_]
def futureImpl[ // def futureImpl[ //
@ -87,7 +109,7 @@ object Ask {
Msg: c.WeakTypeTag // the message being sent down the channel Msg: c.WeakTypeTag // the message being sent down the channel
](c: Context { ](c: Context {
type PrefixType = FutureOps[Msg] type PrefixType = FutureOps[Msg]
})(channel: c.Expr[ChannelRef[Channel]]): c.Expr[Future[WrappedMessage[ReturnChannels, ReturnLUB]]] = { })(channel: c.Expr[ChannelRef[Channel]]): c.Expr[Future[_]] = {
import c.universe._ import c.universe._
val tpeChannel = weakTypeOf[Channel] val tpeChannel = weakTypeOf[Channel]
@ -97,6 +119,16 @@ object Ask {
Tell.verify(c)(null, unwrapped, typeOf[(Any, Nothing) :+: TNil], tpeChannel) Tell.verify(c)(null, unwrapped, typeOf[(Any, Nothing) :+: TNil], tpeChannel)
out match {
case x :: Nil
implicit val ttReturnLUB = c.TypeTag[ReturnLUB](x)
if (tpeMsg <:< typeOf[WrappedMessage[_, _]])
reify(askFutureWrappedNoWrap[ReturnLUB](
channel.splice.actorRef, c.prefix.splice.future.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice))
else
reify(askFutureNoWrap[ReturnLUB](
channel.splice.actorRef, c.prefix.splice.future)(imp[Timeout](c).splice))
case _
implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing])) implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing]))
implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out)) implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out))
if (tpeMsg <:< typeOf[WrappedMessage[_, _]]) if (tpeMsg <:< typeOf[WrappedMessage[_, _]])
@ -106,10 +138,11 @@ object Ask {
reify(askFuture[WrappedMessage[ReturnChannels, ReturnLUB]]( reify(askFuture[WrappedMessage[ReturnChannels, ReturnLUB]](
channel.splice.actorRef, c.prefix.splice.future)(imp[Timeout](c).splice)) channel.splice.actorRef, c.prefix.splice.future)(imp[Timeout](c).splice))
} }
}
val wrapMessage = (m: Any) (new WrappedMessage[TNil, Any](m): Any) val wrapMessage = (m: Any) (new WrappedMessage[TNil, Any](m): Any)
@inline def askOps[T <: WrappedMessage[_, _]](target: ActorRef, msg: Any)(implicit t: Timeout): Future[T] = { def askOps[T <: WrappedMessage[_, _]](target: ActorRef, msg: Any)(implicit t: Timeout): Future[T] = {
implicit val ec = ExecutionContexts.sameThreadExecutionContext implicit val ec = ExecutionContexts.sameThreadExecutionContext
akka.pattern.ask(target, msg).map(wrapMessage).asInstanceOf[Future[T]] akka.pattern.ask(target, msg).map(wrapMessage).asInstanceOf[Future[T]]
} }
@ -124,4 +157,17 @@ object Ask {
future flatMap (w akka.pattern.ask(target, w.value).map(wrapMessage).asInstanceOf[Future[T]]) future flatMap (w akka.pattern.ask(target, w.value).map(wrapMessage).asInstanceOf[Future[T]])
} }
def askOpsNoWrap[T](target: ActorRef, msg: Any)(implicit t: Timeout): Future[T] =
akka.pattern.ask(target, msg).asInstanceOf[Future[T]]
def askFutureNoWrap[T](target: ActorRef, future: Future[_])(implicit t: Timeout): Future[T] = {
implicit val ec = ExecutionContexts.sameThreadExecutionContext
future flatMap (m akka.pattern.ask(target, m).asInstanceOf[Future[T]])
}
def askFutureWrappedNoWrap[T](target: ActorRef, future: Future[WrappedMessage[_, _]])(implicit t: Timeout): Future[T] = {
implicit val ec = ExecutionContexts.sameThreadExecutionContext
future flatMap (w akka.pattern.ask(target, w.value).asInstanceOf[Future[T]])
}
} }

View file

@ -57,10 +57,12 @@ class ChannelDocSpec extends AkkaSpec {
import ChannelDocSpec._ import ChannelDocSpec._
class MsgA trait Msg
class MsgB
class MsgC class MsgA extends Msg
class MsgD class MsgB extends Msg
class MsgC extends Msg
class MsgD extends Msg
"demonstrate why Typed Channels" in { "demonstrate why Typed Channels" in {
def someActor = testActor def someActor = testActor
@ -115,6 +117,7 @@ class ChannelDocSpec extends AkkaSpec {
implicit val timeout: Timeout = ??? // for the ask operations implicit val timeout: Timeout = ??? // for the ask operations
val channelA: ChannelRef[(MsgA, MsgB) :+: TNil] = ??? val channelA: ChannelRef[(MsgA, MsgB) :+: TNil] = ???
val channelA2: ChannelRef[(MsgA, MsgB) :+: (MsgA, MsgC) :+: TNil] = ???
val channelB: ChannelRef[(MsgB, MsgC) :+: TNil] = ??? val channelB: ChannelRef[(MsgB, MsgC) :+: TNil] = ???
val channelC: ChannelRef[(MsgC, MsgD) :+: TNil] = ??? val channelC: ChannelRef[(MsgC, MsgD) :+: TNil] = ???
@ -127,12 +130,15 @@ class ChannelDocSpec extends AkkaSpec {
channelA <-!- fA // eventually send the futures value to channelA channelA <-!- fA // eventually send the futures value to channelA
fA -!-> channelA // same thing as above fA -!-> channelA // same thing as above
// ask the actor; return type given in full for illustration val fB: Future[MsgB] = channelA <-?- a // ask the actor
val fB: Future[WrappedMessage[(MsgB, Nothing) :+: TNil, MsgB]] = channelA <-?- a
val fBunwrapped: Future[MsgB] = fB.lub
a -?-> channelA // same thing as above a -?-> channelA // same thing as above
// ask the actor with multiple reply types
// return type given in full for illustration
val fM: Future[WrappedMessage[ //
(MsgB, Nothing) :+: (MsgC, Nothing) :+: TNil, Msg]] = channelA2 <-?- a
val fMunwrapped: Future[Msg] = fM.lub
channelA <-?- fA // eventually ask the actor, return the future channelA <-?- fA // eventually ask the actor, return the future
fA -?-> channelA // same thing as above fA -?-> channelA // same thing as above

View file

@ -169,10 +169,10 @@ Operations on typed channels are composable and obey a few simple rules:
* the operators are fully symmetric, i.e. ``-!->`` and ``<-!-`` do the same * the operators are fully symmetric, i.e. ``-!->`` and ``<-!-`` do the same
thing provided the arguments also switch places thing provided the arguments also switch places
* sending with ``-?->`` or ``<-?-`` always returns a * sending with ``-?->`` or ``<-?-`` returns a ``Future[WrappedMessage[_, _]]``
``Future[WrappedMessage[_, _]]`` representing all possible reply channels, representing all possible reply channels if there is more than one (use
even if there is only one (use ``.lub`` to get a :class:`Future[_]` with the ``.lub`` to get a :class:`Future[_]` with the most precise single type for
most precise single type for the value) the value)
* sending a :class:`Future[_]` with ``-!->`` or ``<-!-`` returns a new * sending a :class:`Future[_]` with ``-!->`` or ``<-!-`` returns a new
:class:`Future[_]` which will be completed with the value after it has been :class:`Future[_]` which will be completed with the value after it has been