Merge pull request #1108 from akka/wip-channel-transform-∂π

Wip channel transform ∂π
This commit is contained in:
Roland Kuhn 2013-02-06 05:43:04 -08:00
commit 8a259897f8
7 changed files with 116 additions and 44 deletions

View file

@ -95,7 +95,7 @@ object ChannelSpec {
implicit val timeout = Timeout(1.second)
channel[T] { (x, snd)
val xx: WrappedMessage[T, Any] = x
val f: Future[WrappedMessage[(ReplyChannels[T], Nothing) :+: TNil, ReplyChannels[T]]] = target <-?- x
val f: Future[ReplyChannels[T]] = target <-?- x
f -!-> snd
}
import language.existentials
@ -415,7 +415,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
"support typed ask" in {
val t = ChannelExt(system).actorOf(new Tester, "t21")
implicit val timeout = Timeout(1.second)
val r: Future[C] = (t <-?- A).lub
val r: Future[C] = t <-?- A
Await.result(r, 1.second) must be(C)
}
@ -471,6 +471,25 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
lastSender must be === t.actorRef
}
"not wrap Futures unnecessarily" in {
val a = ChannelExt(system).actorOf(new Tester, "t26a")
implicit val timeout = Timeout(1.second)
val c1: C = Await.result(a <-?- A, 1.second)
val c2: C = Await.result(A -?-> a, 1.second)
val fA = Future successful A
val c3: C = Await.result(a <-?- fA, 1.second)
val c4: C = Await.result(fA -?-> a, 1.second)
}
"be able to transform Futures" in {
val client = new ChannelRef[(Any, Nothing) :+: TNil](testActor)
val someActor = ChannelExt(system).actorOf(new Tester, "t26b")
implicit val timeout = Timeout(1.second)
implicit val ec = system.dispatcher
A -?-> someActor -*-> (_ map { case C B }) -?-> someActor -!-> client
expectMsg(D)
}
}
"A WrappedMessage" must {

View file

@ -32,7 +32,7 @@ class ChannelRef[+T <: ChannelList](val actorRef: ActorRef) extends AnyVal {
* which will be completed with the reply message or a TimeoutException.
* If the message is a Future itself, eventually send the Futures value.
*/
def <-?-[M](msg: M): Future[_] = macro macros.Ask.impl[ChannelList, Any, T, M]
def <-?-[M](msg: M): Future[_] = macro macros.Ask.impl[ChannelList, Any, Any, T, M]
/**
* Narrow this ChannelRef by removing channels or narrowing input types or

View file

@ -104,7 +104,7 @@ trait Channels[P <: ChannelList, C <: ChannelList] { this: Actor ⇒
else
F2(recv.asInstanceOf[(Any, ChannelRef[ChannelList]) Unit])
def apply(recv: R): Unit = {
val tt = implicitly[ru.TypeTag[Ch]]
val tt = ru.typeTag[Ch]
behavior ++= (for (t inputChannels(ru)(tt.tpe)) yield tt.mirror.runtimeClass(t.widen) -> ff(recv))
}
}

View file

@ -56,6 +56,7 @@ class ActorRefOps(val ref: ActorRef) extends AnyVal {
class FutureOps[T](val future: Future[T]) extends AnyVal {
def -!->[C <: ChannelList](channel: ChannelRef[C]): Future[T] = macro macros.Tell.futureOpsImpl[C, T]
def -?->[C <: ChannelList](channel: ChannelRef[C]): Future[_] = macro macros.Ask.futureImpl[ChannelList, Any, C, T]
def -*->[U](f: Future[T] Future[U]): Future[U] = f(future)
def lub[LUB](implicit ev: T <:< WrappedMessage[_, LUB]): Future[LUB] = {
implicit val ec = ExecutionContexts.sameThreadExecutionContext
future map (ev(_).value)

View file

@ -21,11 +21,12 @@ object Ask {
def impl[ //
ReturnChannels <: ChannelList, // the precise type union describing the reply
ReturnLUB, // the least-upper bound for the reply types
ReturnT, // the return type if it is just a single type
Channel <: ChannelList: c.WeakTypeTag, // the channel being asked
Msg: c.WeakTypeTag // the message being sent down the channel
](c: Context {
type PrefixType = ChannelRef[Channel]
})(msg: c.Expr[Msg]): c.Expr[Future[WrappedMessage[ReturnChannels, ReturnLUB]]] = {
})(msg: c.Expr[Msg]): c.Expr[Future[_]] = {
import c.universe._
val tpeChannel = weakTypeOf[Channel]
@ -41,18 +42,32 @@ object Ask {
Tell.verify(c)(null, unwrapped, typeOf[(Any, Nothing) :+: TNil], tpeChannel)
implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing]))
implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out))
if (isFuture)
if (unwrapped <:< typeOf[ChannelList])
reify(askFutureWrapped[WrappedMessage[ReturnChannels, ReturnLUB]](
c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice))
else
reify(askFuture[WrappedMessage[ReturnChannels, ReturnLUB]](
c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[Any]])(imp[Timeout](c).splice))
else
reify(askOps[WrappedMessage[ReturnChannels, ReturnLUB]](
c.prefix.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice))
implicit lazy val ttReturn = c.TypeTag[ReturnT](out.head)
implicit lazy val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing]))
implicit lazy val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out))
out match {
case x :: Nil if isFuture
if (unwrapped <:< typeOf[ChannelList])
reify(askFutureWrappedNoWrap[ReturnT](
c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice))
else
reify(askFutureNoWrap[ReturnT](
c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[Any]])(imp[Timeout](c).splice))
case x :: Nil
reify(askOpsNoWrap[ReturnT](
c.prefix.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice))
case _ if isFuture
if (unwrapped <:< typeOf[ChannelList])
reify(askFutureWrapped[WrappedMessage[ReturnChannels, ReturnLUB]](
c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice))
else
reify(askFuture[WrappedMessage[ReturnChannels, ReturnLUB]](
c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[Any]])(imp[Timeout](c).splice))
case _
reify(askOps[WrappedMessage[ReturnChannels, ReturnLUB]](
c.prefix.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice))
}
}
def opsImpl[ //
@ -62,7 +77,7 @@ object Ask {
Msg: c.WeakTypeTag // the message being sent down the channel
](c: Context {
type PrefixType = AnyOps[Msg]
})(channel: c.Expr[ChannelRef[Channel]]): c.Expr[Future[WrappedMessage[ReturnChannels, ReturnLUB]]] = {
})(channel: c.Expr[ChannelRef[Channel]]): c.Expr[Future[_]] = {
import c.universe._
val tpeChannel = weakTypeOf[Channel]
@ -72,11 +87,18 @@ object Ask {
Tell.verify(c)(null, unwrapped, typeOf[(Any, Nothing) :+: TNil], tpeChannel)
implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing]))
implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out))
val msg = reify(c.prefix.splice.value)
reify(askOps[WrappedMessage[ReturnChannels, ReturnLUB]](
channel.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice))
out match {
case x :: Nil
implicit val ttReturnLUB = c.TypeTag[ReturnLUB](x)
reify(askOpsNoWrap[ReturnLUB](
channel.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice))
case _
implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing]))
implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out))
reify(askOps[WrappedMessage[ReturnChannels, ReturnLUB]](
channel.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice))
}
}
// this is the implementation for Future[_] -?-> ChannelRef[_]
@ -87,7 +109,7 @@ object Ask {
Msg: c.WeakTypeTag // the message being sent down the channel
](c: Context {
type PrefixType = FutureOps[Msg]
})(channel: c.Expr[ChannelRef[Channel]]): c.Expr[Future[WrappedMessage[ReturnChannels, ReturnLUB]]] = {
})(channel: c.Expr[ChannelRef[Channel]]): c.Expr[Future[_]] = {
import c.universe._
val tpeChannel = weakTypeOf[Channel]
@ -97,19 +119,30 @@ object Ask {
Tell.verify(c)(null, unwrapped, typeOf[(Any, Nothing) :+: TNil], tpeChannel)
implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing]))
implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out))
if (tpeMsg <:< typeOf[WrappedMessage[_, _]])
reify(askFutureWrapped[WrappedMessage[ReturnChannels, ReturnLUB]](
channel.splice.actorRef, c.prefix.splice.future.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice))
else
reify(askFuture[WrappedMessage[ReturnChannels, ReturnLUB]](
channel.splice.actorRef, c.prefix.splice.future)(imp[Timeout](c).splice))
out match {
case x :: Nil
implicit val ttReturnLUB = c.TypeTag[ReturnLUB](x)
if (tpeMsg <:< typeOf[WrappedMessage[_, _]])
reify(askFutureWrappedNoWrap[ReturnLUB](
channel.splice.actorRef, c.prefix.splice.future.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice))
else
reify(askFutureNoWrap[ReturnLUB](
channel.splice.actorRef, c.prefix.splice.future)(imp[Timeout](c).splice))
case _
implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing]))
implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out))
if (tpeMsg <:< typeOf[WrappedMessage[_, _]])
reify(askFutureWrapped[WrappedMessage[ReturnChannels, ReturnLUB]](
channel.splice.actorRef, c.prefix.splice.future.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice))
else
reify(askFuture[WrappedMessage[ReturnChannels, ReturnLUB]](
channel.splice.actorRef, c.prefix.splice.future)(imp[Timeout](c).splice))
}
}
val wrapMessage = (m: Any) (new WrappedMessage[TNil, Any](m): Any)
@inline def askOps[T <: WrappedMessage[_, _]](target: ActorRef, msg: Any)(implicit t: Timeout): Future[T] = {
def askOps[T <: WrappedMessage[_, _]](target: ActorRef, msg: Any)(implicit t: Timeout): Future[T] = {
implicit val ec = ExecutionContexts.sameThreadExecutionContext
akka.pattern.ask(target, msg).map(wrapMessage).asInstanceOf[Future[T]]
}
@ -124,4 +157,17 @@ object Ask {
future flatMap (w akka.pattern.ask(target, w.value).map(wrapMessage).asInstanceOf[Future[T]])
}
def askOpsNoWrap[T](target: ActorRef, msg: Any)(implicit t: Timeout): Future[T] =
akka.pattern.ask(target, msg).asInstanceOf[Future[T]]
def askFutureNoWrap[T](target: ActorRef, future: Future[_])(implicit t: Timeout): Future[T] = {
implicit val ec = ExecutionContexts.sameThreadExecutionContext
future flatMap (m akka.pattern.ask(target, m).asInstanceOf[Future[T]])
}
def askFutureWrappedNoWrap[T](target: ActorRef, future: Future[WrappedMessage[_, _]])(implicit t: Timeout): Future[T] = {
implicit val ec = ExecutionContexts.sameThreadExecutionContext
future flatMap (w akka.pattern.ask(target, w.value).asInstanceOf[Future[T]])
}
}

View file

@ -57,10 +57,12 @@ class ChannelDocSpec extends AkkaSpec {
import ChannelDocSpec._
class MsgA
class MsgB
class MsgC
class MsgD
trait Msg
class MsgA extends Msg
class MsgB extends Msg
class MsgC extends Msg
class MsgD extends Msg
"demonstrate why Typed Channels" in {
def someActor = testActor
@ -115,6 +117,7 @@ class ChannelDocSpec extends AkkaSpec {
implicit val timeout: Timeout = ??? // for the ask operations
val channelA: ChannelRef[(MsgA, MsgB) :+: TNil] = ???
val channelA2: ChannelRef[(MsgA, MsgB) :+: (MsgA, MsgC) :+: TNil] = ???
val channelB: ChannelRef[(MsgB, MsgC) :+: TNil] = ???
val channelC: ChannelRef[(MsgC, MsgD) :+: TNil] = ???
@ -127,12 +130,15 @@ class ChannelDocSpec extends AkkaSpec {
channelA <-!- fA // eventually send the futures value to channelA
fA -!-> channelA // same thing as above
// ask the actor; return type given in full for illustration
val fB: Future[WrappedMessage[(MsgB, Nothing) :+: TNil, MsgB]] = channelA <-?- a
val fBunwrapped: Future[MsgB] = fB.lub
val fB: Future[MsgB] = channelA <-?- a // ask the actor
a -?-> channelA // same thing as above
// ask the actor with multiple reply types
// return type given in full for illustration
val fM: Future[WrappedMessage[ //
(MsgB, Nothing) :+: (MsgC, Nothing) :+: TNil, Msg]] = channelA2 <-?- a
val fMunwrapped: Future[Msg] = fM.lub
channelA <-?- fA // eventually ask the actor, return the future
fA -?-> channelA // same thing as above

View file

@ -169,10 +169,10 @@ Operations on typed channels are composable and obey a few simple rules:
* the operators are fully symmetric, i.e. ``-!->`` and ``<-!-`` do the same
thing provided the arguments also switch places
* sending with ``-?->`` or ``<-?-`` always returns a
``Future[WrappedMessage[_, _]]`` representing all possible reply channels,
even if there is only one (use ``.lub`` to get a :class:`Future[_]` with the
most precise single type for the value)
* sending with ``-?->`` or ``<-?-`` returns a ``Future[WrappedMessage[_, _]]``
representing all possible reply channels if there is more than one (use
``.lub`` to get a :class:`Future[_]` with the most precise single type for
the value)
* sending a :class:`Future[_]` with ``-!->`` or ``<-!-`` returns a new
:class:`Future[_]` which will be completed with the value after it has been