diff --git a/akka-channels-tests/src/test/scala/akka/channels/ChannelSpec.scala b/akka-channels-tests/src/test/scala/akka/channels/ChannelSpec.scala index a06a0647dc..50c46781c6 100644 --- a/akka-channels-tests/src/test/scala/akka/channels/ChannelSpec.scala +++ b/akka-channels-tests/src/test/scala/akka/channels/ChannelSpec.scala @@ -95,7 +95,7 @@ object ChannelSpec { implicit val timeout = Timeout(1.second) channel[T] { (x, snd) ⇒ val xx: WrappedMessage[T, Any] = x - val f: Future[WrappedMessage[(ReplyChannels[T], Nothing) :+: TNil, ReplyChannels[T]]] = target <-?- x + val f: Future[ReplyChannels[T]] = target <-?- x f -!-> snd } import language.existentials @@ -415,7 +415,7 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, "support typed ask" in { val t = ChannelExt(system).actorOf(new Tester, "t21") implicit val timeout = Timeout(1.second) - val r: Future[C] = (t <-?- A).lub + val r: Future[C] = t <-?- A Await.result(r, 1.second) must be(C) } @@ -471,12 +471,22 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, lastSender must be === t.actorRef } + "not wrap Futures unnecessarily" in { + val a = ChannelExt(system).actorOf(new Tester, "t26a") + implicit val timeout = Timeout(1.second) + val c1: C = Await.result(a <-?- A, 1.second) + val c2: C = Await.result(A -?-> a, 1.second) + val fA = Future successful A + val c3: C = Await.result(a <-?- fA, 1.second) + val c4: C = Await.result(fA -?-> a, 1.second) + } + "be able to transform Futures" in { val client = new ChannelRef[(Any, Nothing) :+: TNil](testActor) val someActor = ChannelExt(system).actorOf(new Tester, "t26b") implicit val timeout = Timeout(1.second) implicit val ec = system.dispatcher - A -?-> someActor -*-> (_ map (_.value match { case C ⇒ B })) -?-> someActor -!-> client + A -?-> someActor -*-> (_ map { case C ⇒ B }) -?-> someActor -!-> client expectMsg(D) } diff --git a/akka-channels/src/main/scala/akka/channels/ChannelRef.scala b/akka-channels/src/main/scala/akka/channels/ChannelRef.scala index 7fe2ee2591..80adae254d 100644 --- a/akka-channels/src/main/scala/akka/channels/ChannelRef.scala +++ b/akka-channels/src/main/scala/akka/channels/ChannelRef.scala @@ -32,7 +32,7 @@ class ChannelRef[+T <: ChannelList](val actorRef: ActorRef) extends AnyVal { * which will be completed with the reply message or a TimeoutException. * If the message is a Future itself, eventually send the Future’s value. */ - def <-?-[M](msg: M): Future[_] = macro macros.Ask.impl[ChannelList, Any, T, M] + def <-?-[M](msg: M): Future[_] = macro macros.Ask.impl[ChannelList, Any, Any, T, M] /** * Narrow this ChannelRef by removing channels or narrowing input types or diff --git a/akka-channels/src/main/scala/akka/channels/macros/Ask.scala b/akka-channels/src/main/scala/akka/channels/macros/Ask.scala index ca43f8eddb..628dc467d7 100644 --- a/akka-channels/src/main/scala/akka/channels/macros/Ask.scala +++ b/akka-channels/src/main/scala/akka/channels/macros/Ask.scala @@ -21,11 +21,12 @@ object Ask { def impl[ // ReturnChannels <: ChannelList, // the precise type union describing the reply ReturnLUB, // the least-upper bound for the reply types + ReturnT, // the return type if it is just a single type Channel <: ChannelList: c.WeakTypeTag, // the channel being asked Msg: c.WeakTypeTag // the message being sent down the channel ](c: Context { type PrefixType = ChannelRef[Channel] - })(msg: c.Expr[Msg]): c.Expr[Future[WrappedMessage[ReturnChannels, ReturnLUB]]] = { + })(msg: c.Expr[Msg]): c.Expr[Future[_]] = { import c.universe._ val tpeChannel = weakTypeOf[Channel] @@ -41,18 +42,32 @@ object Ask { Tell.verify(c)(null, unwrapped, typeOf[(Any, Nothing) :+: TNil], tpeChannel) - implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing])) - implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out)) - if (isFuture) - if (unwrapped <:< typeOf[ChannelList]) - reify(askFutureWrapped[WrappedMessage[ReturnChannels, ReturnLUB]]( - c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice)) - else - reify(askFuture[WrappedMessage[ReturnChannels, ReturnLUB]]( - c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[Any]])(imp[Timeout](c).splice)) - else - reify(askOps[WrappedMessage[ReturnChannels, ReturnLUB]]( - c.prefix.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice)) + implicit lazy val ttReturn = c.TypeTag[ReturnT](out.head) + implicit lazy val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing])) + implicit lazy val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out)) + + out match { + case x :: Nil if isFuture ⇒ + if (unwrapped <:< typeOf[ChannelList]) + reify(askFutureWrappedNoWrap[ReturnT]( + c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice)) + else + reify(askFutureNoWrap[ReturnT]( + c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[Any]])(imp[Timeout](c).splice)) + case x :: Nil ⇒ + reify(askOpsNoWrap[ReturnT]( + c.prefix.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice)) + case _ if isFuture ⇒ + if (unwrapped <:< typeOf[ChannelList]) + reify(askFutureWrapped[WrappedMessage[ReturnChannels, ReturnLUB]]( + c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice)) + else + reify(askFuture[WrappedMessage[ReturnChannels, ReturnLUB]]( + c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[Any]])(imp[Timeout](c).splice)) + case _ ⇒ + reify(askOps[WrappedMessage[ReturnChannels, ReturnLUB]]( + c.prefix.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice)) + } } def opsImpl[ // @@ -62,7 +77,7 @@ object Ask { Msg: c.WeakTypeTag // the message being sent down the channel ](c: Context { type PrefixType = AnyOps[Msg] - })(channel: c.Expr[ChannelRef[Channel]]): c.Expr[Future[WrappedMessage[ReturnChannels, ReturnLUB]]] = { + })(channel: c.Expr[ChannelRef[Channel]]): c.Expr[Future[_]] = { import c.universe._ val tpeChannel = weakTypeOf[Channel] @@ -72,11 +87,18 @@ object Ask { Tell.verify(c)(null, unwrapped, typeOf[(Any, Nothing) :+: TNil], tpeChannel) - implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing])) - implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out)) val msg = reify(c.prefix.splice.value) - reify(askOps[WrappedMessage[ReturnChannels, ReturnLUB]]( - channel.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice)) + out match { + case x :: Nil ⇒ + implicit val ttReturnLUB = c.TypeTag[ReturnLUB](x) + reify(askOpsNoWrap[ReturnLUB]( + channel.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice)) + case _ ⇒ + implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing])) + implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out)) + reify(askOps[WrappedMessage[ReturnChannels, ReturnLUB]]( + channel.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice)) + } } // this is the implementation for Future[_] -?-> ChannelRef[_] @@ -87,7 +109,7 @@ object Ask { Msg: c.WeakTypeTag // the message being sent down the channel ](c: Context { type PrefixType = FutureOps[Msg] - })(channel: c.Expr[ChannelRef[Channel]]): c.Expr[Future[WrappedMessage[ReturnChannels, ReturnLUB]]] = { + })(channel: c.Expr[ChannelRef[Channel]]): c.Expr[Future[_]] = { import c.universe._ val tpeChannel = weakTypeOf[Channel] @@ -97,19 +119,30 @@ object Ask { Tell.verify(c)(null, unwrapped, typeOf[(Any, Nothing) :+: TNil], tpeChannel) - implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing])) - implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out)) - if (tpeMsg <:< typeOf[WrappedMessage[_, _]]) - reify(askFutureWrapped[WrappedMessage[ReturnChannels, ReturnLUB]]( - channel.splice.actorRef, c.prefix.splice.future.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice)) - else - reify(askFuture[WrappedMessage[ReturnChannels, ReturnLUB]]( - channel.splice.actorRef, c.prefix.splice.future)(imp[Timeout](c).splice)) + out match { + case x :: Nil ⇒ + implicit val ttReturnLUB = c.TypeTag[ReturnLUB](x) + if (tpeMsg <:< typeOf[WrappedMessage[_, _]]) + reify(askFutureWrappedNoWrap[ReturnLUB]( + channel.splice.actorRef, c.prefix.splice.future.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice)) + else + reify(askFutureNoWrap[ReturnLUB]( + channel.splice.actorRef, c.prefix.splice.future)(imp[Timeout](c).splice)) + case _ ⇒ + implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing])) + implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out)) + if (tpeMsg <:< typeOf[WrappedMessage[_, _]]) + reify(askFutureWrapped[WrappedMessage[ReturnChannels, ReturnLUB]]( + channel.splice.actorRef, c.prefix.splice.future.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice)) + else + reify(askFuture[WrappedMessage[ReturnChannels, ReturnLUB]]( + channel.splice.actorRef, c.prefix.splice.future)(imp[Timeout](c).splice)) + } } val wrapMessage = (m: Any) ⇒ (new WrappedMessage[TNil, Any](m): Any) - @inline def askOps[T <: WrappedMessage[_, _]](target: ActorRef, msg: Any)(implicit t: Timeout): Future[T] = { + def askOps[T <: WrappedMessage[_, _]](target: ActorRef, msg: Any)(implicit t: Timeout): Future[T] = { implicit val ec = ExecutionContexts.sameThreadExecutionContext akka.pattern.ask(target, msg).map(wrapMessage).asInstanceOf[Future[T]] } @@ -124,4 +157,17 @@ object Ask { future flatMap (w ⇒ akka.pattern.ask(target, w.value).map(wrapMessage).asInstanceOf[Future[T]]) } + def askOpsNoWrap[T](target: ActorRef, msg: Any)(implicit t: Timeout): Future[T] = + akka.pattern.ask(target, msg).asInstanceOf[Future[T]] + + def askFutureNoWrap[T](target: ActorRef, future: Future[_])(implicit t: Timeout): Future[T] = { + implicit val ec = ExecutionContexts.sameThreadExecutionContext + future flatMap (m ⇒ akka.pattern.ask(target, m).asInstanceOf[Future[T]]) + } + + def askFutureWrappedNoWrap[T](target: ActorRef, future: Future[WrappedMessage[_, _]])(implicit t: Timeout): Future[T] = { + implicit val ec = ExecutionContexts.sameThreadExecutionContext + future flatMap (w ⇒ akka.pattern.ask(target, w.value).asInstanceOf[Future[T]]) + } + } \ No newline at end of file diff --git a/akka-docs/rst/scala/code/docs/channels/ChannelDocSpec.scala b/akka-docs/rst/scala/code/docs/channels/ChannelDocSpec.scala index f45247c8ad..c0e0bede9e 100644 --- a/akka-docs/rst/scala/code/docs/channels/ChannelDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/channels/ChannelDocSpec.scala @@ -57,10 +57,12 @@ class ChannelDocSpec extends AkkaSpec { import ChannelDocSpec._ - class MsgA - class MsgB - class MsgC - class MsgD + trait Msg + + class MsgA extends Msg + class MsgB extends Msg + class MsgC extends Msg + class MsgD extends Msg "demonstrate why Typed Channels" in { def someActor = testActor @@ -115,6 +117,7 @@ class ChannelDocSpec extends AkkaSpec { implicit val timeout: Timeout = ??? // for the ask operations val channelA: ChannelRef[(MsgA, MsgB) :+: TNil] = ??? + val channelA2: ChannelRef[(MsgA, MsgB) :+: (MsgA, MsgC) :+: TNil] = ??? val channelB: ChannelRef[(MsgB, MsgC) :+: TNil] = ??? val channelC: ChannelRef[(MsgC, MsgD) :+: TNil] = ??? @@ -127,12 +130,15 @@ class ChannelDocSpec extends AkkaSpec { channelA <-!- fA // eventually send the future’s value to channelA fA -!-> channelA // same thing as above - // ask the actor; return type given in full for illustration - val fB: Future[WrappedMessage[(MsgB, Nothing) :+: TNil, MsgB]] = channelA <-?- a - val fBunwrapped: Future[MsgB] = fB.lub - + val fB: Future[MsgB] = channelA <-?- a // ask the actor a -?-> channelA // same thing as above + // ask the actor with multiple reply types + // return type given in full for illustration + val fM: Future[WrappedMessage[ // + (MsgB, Nothing) :+: (MsgC, Nothing) :+: TNil, Msg]] = channelA2 <-?- a + val fMunwrapped: Future[Msg] = fM.lub + channelA <-?- fA // eventually ask the actor, return the future fA -?-> channelA // same thing as above diff --git a/akka-docs/rst/scala/typed-channels.rst b/akka-docs/rst/scala/typed-channels.rst index 0ea47bdfc0..e2a300791a 100644 --- a/akka-docs/rst/scala/typed-channels.rst +++ b/akka-docs/rst/scala/typed-channels.rst @@ -169,10 +169,10 @@ Operations on typed channels are composable and obey a few simple rules: * the operators are fully symmetric, i.e. ``-!->`` and ``<-!-`` do the same thing provided the arguments also switch places -* sending with ``-?->`` or ``<-?-`` always returns a - ``Future[WrappedMessage[_, _]]`` representing all possible reply channels, - even if there is only one (use ``.lub`` to get a :class:`Future[_]` with the - most precise single type for the value) +* sending with ``-?->`` or ``<-?-`` returns a ``Future[WrappedMessage[_, _]]`` + representing all possible reply channels if there is more than one (use + ``.lub`` to get a :class:`Future[_]` with the most precise single type for + the value) * sending a :class:`Future[_]` with ``-!->`` or ``<-!-`` returns a new :class:`Future[_]` which will be completed with the value after it has been