diff --git a/akka-channels-tests/src/test/scala/akka/channels/ChannelSpec.scala b/akka-channels-tests/src/test/scala/akka/channels/ChannelSpec.scala index 088dceeeca..a771af2197 100644 --- a/akka-channels-tests/src/test/scala/akka/channels/ChannelSpec.scala +++ b/akka-channels-tests/src/test/scala/akka/channels/ChannelSpec.scala @@ -482,7 +482,9 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, }) val t = ChannelExt(system).actorOf(new Tester) val a = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](A) + val fa = Future successful a val b = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](B) + val fb = Future successful b t <-!- a expectMsg(C) a -!-> t @@ -491,6 +493,14 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, expectMsg(D) b -!-> t expectMsg(D) + t <-!- fa + expectMsg(C) + fa -!-> t + expectMsg(C) + t <-!- fb + expectMsg(D) + fb -!-> t + expectMsg(D) } "not be sendable with wrong channels" when { @@ -540,11 +550,17 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf, implicit val timeout = Timeout(1.second) val t = ChannelExt(system).actorOf(new Tester) val a = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](A) + val fa = Future successful a val b = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](B) + val fb = Future successful b (Await.result(t <-?- a, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(C) (Await.result(a -?-> t, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(C) (Await.result(t <-?- b, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(D) (Await.result(b -?-> t, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(D) + (Await.result(t <-?- fa, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(C) + (Await.result(fa -?-> t, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(C) + (Await.result(t <-?- fb, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(D) + (Await.result(fb -?-> t, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(D) } "not be askable with wrong channels" when { diff --git a/akka-channels/src/main/scala/akka/channels/ChannelRef.scala b/akka-channels/src/main/scala/akka/channels/ChannelRef.scala index 34541a7b8e..5f23b6fa3f 100644 --- a/akka-channels/src/main/scala/akka/channels/ChannelRef.scala +++ b/akka-channels/src/main/scala/akka/channels/ChannelRef.scala @@ -21,7 +21,9 @@ case class NarrowingException(errors: String) extends AkkaException(errors) with class ChannelRef[+T <: ChannelList](val actorRef: ActorRef) extends AnyVal { - def <-!-[M](msg: M): Unit = macro macros.Tell.impl[T, M] + def <-!-[M](msg: M): M = macro macros.Tell.impl[T, M] + + def <-!-[M](future: Future[M]): Future[M] = macro macros.Tell.futureImpl[T, M] def <-?-[M](msg: M): Future[_] = macro macros.Ask.impl[ChannelList, Any, T, M] diff --git a/akka-channels/src/main/scala/akka/channels/Ops.scala b/akka-channels/src/main/scala/akka/channels/Ops.scala index b8a3019af4..2448f6a958 100644 --- a/akka-channels/src/main/scala/akka/channels/Ops.scala +++ b/akka-channels/src/main/scala/akka/channels/Ops.scala @@ -17,7 +17,7 @@ sealed trait ReplyChannels[T <: ChannelList] extends ChannelList /** * This type is used to stand in for the unknown reply types of the fabricated * sender references; users don’t need to write it down, and if they do, they - * know that they’re cheating (since these ref types must not escape their + * know that they’re cheating (since these ref types must not escape their * defining actor context). */ sealed trait UnknownDoNotWriteMeDown @@ -34,7 +34,7 @@ class ActorRefOps(val ref: ActorRef) extends AnyVal { } class FutureOps[T](val future: Future[T]) extends AnyVal { - def -!->[C <: ChannelList](channel: ChannelRef[C]): Future[T] = macro macros.Tell.futureImpl[C, T] + def -!->[C <: ChannelList](channel: ChannelRef[C]): Future[T] = macro macros.Tell.futureOpsImpl[C, T] def -?->[C <: ChannelList](channel: ChannelRef[C]): Future[_] = macro macros.Ask.futureImpl[ChannelList, Any, C, T] def lub[LUB](implicit ev: T <:< WrappedMessage[_, LUB]): Future[LUB] = { implicit val ec = ExecutionContexts.sameThreadExecutionContext @@ -43,7 +43,7 @@ class FutureOps[T](val future: Future[T]) extends AnyVal { } class AnyOps[T](val value: T) extends AnyVal { - def -!->[C <: ChannelList](channel: ChannelRef[C]): Unit = macro macros.Tell.opsImpl[C, T] + def -!->[C <: ChannelList](channel: ChannelRef[C]): T = macro macros.Tell.opsImpl[C, T] def -?->[C <: ChannelList](channel: ChannelRef[C]): Future[_] = macro macros.Ask.opsImpl[ChannelList, Any, C, T] } diff --git a/akka-channels/src/main/scala/akka/channels/macros/Ask.scala b/akka-channels/src/main/scala/akka/channels/macros/Ask.scala index f4dbf475ce..267aee9115 100644 --- a/akka-channels/src/main/scala/akka/channels/macros/Ask.scala +++ b/akka-channels/src/main/scala/akka/channels/macros/Ask.scala @@ -26,15 +26,29 @@ object Ask { val tpeChannel = weakTypeOf[Channel] val tpeMsg = weakTypeOf[Msg] - val unwrapped = unwrapMsgType(c.universe)(tpeMsg) + val isFuture = tpeMsg <:< typeOf[Future[_]] + val unwrapped = + if (isFuture) + tpeMsg match { + case TypeRef(_, _, x :: _) ⇒ unwrapMsgType(c.universe)(x) + } + else unwrapMsgType(c.universe)(tpeMsg) val out = replyChannels(c.universe)(tpeChannel, unwrapped) Tell.verify(c)(null, unwrapped, typeOf[(Any, Nothing) :+: TNil], tpeChannel) implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing])) implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out)) - reify(askOps[WrappedMessage[ReturnChannels, ReturnLUB]]( - c.prefix.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice)) + if (isFuture) + if (unwrapped <:< typeOf[ChannelList]) + reify(askFutureWrapped[WrappedMessage[ReturnChannels, ReturnLUB]]( + c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice)) + else + reify(askFuture[WrappedMessage[ReturnChannels, ReturnLUB]]( + c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[Any]])(imp[Timeout](c).splice)) + else + reify(askOps[WrappedMessage[ReturnChannels, ReturnLUB]]( + c.prefix.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice)) } def opsImpl[ // @@ -61,6 +75,7 @@ object Ask { channel.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice)) } + // this is the implementation for Future[_] -?-> ChannelRef[_] def futureImpl[ // ReturnChannels <: ChannelList, // the precise type union describing the reply ReturnLUB, // the least-upper bound for the reply types @@ -80,7 +95,7 @@ object Ask { implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing])) implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out)) - if (tpeMsg <:< typeOf[ChannelList]) + if (tpeMsg <:< typeOf[WrappedMessage[_, _]]) reify(askFutureWrapped[WrappedMessage[ReturnChannels, ReturnLUB]]( channel.splice.actorRef, c.prefix.splice.future.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice)) else diff --git a/akka-channels/src/main/scala/akka/channels/macros/Tell.scala b/akka-channels/src/main/scala/akka/channels/macros/Tell.scala index c86f7e5d3c..13e4019f1b 100644 --- a/akka-channels/src/main/scala/akka/channels/macros/Tell.scala +++ b/akka-channels/src/main/scala/akka/channels/macros/Tell.scala @@ -16,42 +16,50 @@ object Tell { def impl[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context { type PrefixType = ChannelRef[MyChannels] - })(msg: c.Expr[Msg]): c.Expr[Unit] = { - val tpeMyChannels = c.universe.weakTypeOf[MyChannels] - val tpeMsg = c.universe.weakTypeOf[Msg] - val (tpeSender, senderTree, sender) = getSenderChannel(c) - - verify(c)(senderTree, unwrapMsgType(c.universe)(tpeMsg), tpeSender, tpeMyChannels) - - c.universe.reify(c.prefix.splice.actorRef.tell(toMsg(c)(msg, tpeMsg).splice, sender.splice)) - } + })(msg: c.Expr[Msg]): c.Expr[Msg] = + doTell(c)(c.universe.weakTypeOf[MyChannels], c.universe.weakTypeOf[Msg], msg, c.prefix) def opsImpl[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context { type PrefixType = AnyOps[Msg] - })(channel: c.Expr[ChannelRef[MyChannels]]): c.Expr[Unit] = { - val tpeMyChannels = c.universe.weakTypeOf[MyChannels] - val tpeMsg = c.universe.weakTypeOf[Msg] + })(channel: c.Expr[ChannelRef[MyChannels]]): c.Expr[Msg] = { + import c.universe._ + doTell(c)(weakTypeOf[MyChannels], weakTypeOf[Msg], reify(c.prefix.splice.value), channel) + } + + def doTell[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context)( + tpeMyChannels: c.Type, tpeMsg: c.Type, msg: c.Expr[Msg], target: c.Expr[ChannelRef[MyChannels]]): c.Expr[Msg] = { val (tpeSender, senderTree, sender) = getSenderChannel(c) - verify(c)(senderTree, unwrapMsgType(c.universe)(tpeMsg), tpeSender, tpeMyChannels) + val cond = bool(c, tpeMsg <:< c.typeOf[WrappedMessage[_, _]]) + c.universe.reify { + val $m = msg.splice + target.splice.actorRef.tell(if (cond.splice) $m.asInstanceOf[WrappedMessage[TNil, Any]].value else $m, sender.splice) + $m + } + } - val msg = c.universe.reify(c.prefix.splice.value) - c.universe.reify(channel.splice.actorRef.tell(toMsg(c)(msg, tpeMsg).splice, sender.splice)) + def futureOpsImpl[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context { + type PrefixType = FutureOps[Msg] + })(channel: c.Expr[ChannelRef[MyChannels]]): c.Expr[Future[Msg]] = { + import c.universe._ + doFutureTell(c)(weakTypeOf[MyChannels], weakTypeOf[Msg], reify(c.prefix.splice.future), channel) } def futureImpl[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context { - type PrefixType = FutureOps[Msg] - })(channel: c.Expr[ChannelRef[MyChannels]]): c.Expr[Future[Msg]] = { - val tpeMyChannels = c.universe.weakTypeOf[MyChannels] - val tpeMsg = c.universe.weakTypeOf[Msg] - val (tpeSender, senderTree, sender) = getSenderChannel(c) - - verify(c)(senderTree, unwrapMsgType(c.universe)(tpeMsg), tpeSender, tpeMyChannels) - - c.universe.reify(pipeTo[Msg](c.prefix.splice, channel.splice, sender.splice)) + type PrefixType = ChannelRef[MyChannels] + })(future: c.Expr[Future[Msg]]): c.Expr[Future[Msg]] = { + import c.universe._ + doFutureTell(c)(weakTypeOf[MyChannels], weakTypeOf[Msg], future, c.prefix) } - @inline def pipeTo[Msg](f: FutureOps[Msg], c: ChannelRef[_], snd: ActorRef): Future[Msg] = + def doFutureTell[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context)( + tpeMyChannels: c.Type, tpeMsg: c.Type, future: c.Expr[Future[Msg]], target: c.Expr[ChannelRef[MyChannels]]): c.Expr[Future[Msg]] = { + val (tpeSender, senderTree, sender) = getSenderChannel(c) + verify(c)(senderTree, unwrapMsgType(c.universe)(tpeMsg), tpeSender, tpeMyChannels) + c.universe.reify(pipeTo[Msg](future.splice, target.splice, sender.splice)) + } + + @inline def pipeTo[Msg](f: Future[Msg], c: ChannelRef[_], snd: ActorRef): Future[Msg] = f.future.andThen { case Success(s: WrappedMessage[_, _]) ⇒ c.actorRef.tell(s.value, snd) case Success(s) ⇒ c.actorRef.tell(s, snd) @@ -76,12 +84,12 @@ object Tell { def rec(msg: Set[c.universe.Type], checked: Set[c.universe.Type], depth: Int): Unit = if (msg.nonEmpty) { val u: c.universe.type = c.universe - val replies = msg map (m ⇒ m -> (replyChannels(u)(chT, m) map (t => ignoreUnknown(t)))) + val replies = msg map (m ⇒ m -> (replyChannels(u)(chT, m) map (t ⇒ ignoreUnknown(t)))) val missing = replies collect { case (k, v) if v.size == 0 ⇒ k } if (missing.nonEmpty) error(c, s"target ChannelRef does not support messages of types ${missing mkString ", "} (at depth $depth)") else { - val nextSend = replies.map(_._2).flatten map (m ⇒ m -> (replyChannels(u)(sndT, m) map (t => ignoreUnknown(t)))) + val nextSend = replies.map(_._2).flatten map (m ⇒ m -> (replyChannels(u)(sndT, m) map (t ⇒ ignoreUnknown(t)))) val nextMissing = nextSend collect { case (k, v) if v.size == 0 ⇒ k } if (nextMissing.nonEmpty) error(c, s"implicit sender `$sender` does not support messages of the reply types ${nextMissing mkString ", "} (at depth $depth)") diff --git a/akka-docs/rst/scala/code/docs/channels/ChannelDocSpec.scala b/akka-docs/rst/scala/code/docs/channels/ChannelDocSpec.scala index 8d1553a72d..0806e6aad1 100644 --- a/akka-docs/rst/scala/code/docs/channels/ChannelDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/channels/ChannelDocSpec.scala @@ -91,7 +91,7 @@ class ChannelDocSpec extends AkkaSpec { channelA <-!- a // send a to channelA a -!-> channelA // same thing as above - //channelA <-!- fA // eventually send the future’s value to channelA + channelA <-!- fA // eventually send the future’s value to channelA fA -!-> channelA // same thing as above // ask the actor; return type given in full for illustration @@ -100,7 +100,7 @@ class ChannelDocSpec extends AkkaSpec { a -?-> channelA // same thing as above - //channelA <-?- fA // eventually ask the actor, return the future + channelA <-?- fA // eventually ask the actor, return the future fA -?-> channelA // same thing as above // chaining works as well