make all arrows invertible
This commit is contained in:
parent
5e763bbb38
commit
f86fa61613
6 changed files with 78 additions and 37 deletions
|
|
@ -482,7 +482,9 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
|
|||
})
|
||||
val t = ChannelExt(system).actorOf(new Tester)
|
||||
val a = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](A)
|
||||
val fa = Future successful a
|
||||
val b = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](B)
|
||||
val fb = Future successful b
|
||||
t <-!- a
|
||||
expectMsg(C)
|
||||
a -!-> t
|
||||
|
|
@ -491,6 +493,14 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
|
|||
expectMsg(D)
|
||||
b -!-> t
|
||||
expectMsg(D)
|
||||
t <-!- fa
|
||||
expectMsg(C)
|
||||
fa -!-> t
|
||||
expectMsg(C)
|
||||
t <-!- fb
|
||||
expectMsg(D)
|
||||
fb -!-> t
|
||||
expectMsg(D)
|
||||
}
|
||||
|
||||
"not be sendable with wrong channels" when {
|
||||
|
|
@ -540,11 +550,17 @@ class ChannelSpec extends AkkaSpec(ActorSystem("ChannelSpec", AkkaSpec.testConf,
|
|||
implicit val timeout = Timeout(1.second)
|
||||
val t = ChannelExt(system).actorOf(new Tester)
|
||||
val a = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](A)
|
||||
val fa = Future successful a
|
||||
val b = new WrappedMessage[(A, Nothing) :+:(B, Nothing) :+: TNil, Msg](B)
|
||||
val fb = Future successful b
|
||||
(Await.result(t <-?- a, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(C)
|
||||
(Await.result(a -?-> t, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(C)
|
||||
(Await.result(t <-?- b, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(D)
|
||||
(Await.result(b -?-> t, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(D)
|
||||
(Await.result(t <-?- fa, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(C)
|
||||
(Await.result(fa -?-> t, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(C)
|
||||
(Await.result(t <-?- fb, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(D)
|
||||
(Await.result(fb -?-> t, timeout.duration): WrappedMessage[(C, Nothing) :+: (D, Nothing) :+: TNil, Msg]).value must be(D)
|
||||
}
|
||||
|
||||
"not be askable with wrong channels" when {
|
||||
|
|
|
|||
|
|
@ -21,7 +21,9 @@ case class NarrowingException(errors: String) extends AkkaException(errors) with
|
|||
|
||||
class ChannelRef[+T <: ChannelList](val actorRef: ActorRef) extends AnyVal {
|
||||
|
||||
def <-!-[M](msg: M): Unit = macro macros.Tell.impl[T, M]
|
||||
def <-!-[M](msg: M): M = macro macros.Tell.impl[T, M]
|
||||
|
||||
def <-!-[M](future: Future[M]): Future[M] = macro macros.Tell.futureImpl[T, M]
|
||||
|
||||
def <-?-[M](msg: M): Future[_] = macro macros.Ask.impl[ChannelList, Any, T, M]
|
||||
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ class ActorRefOps(val ref: ActorRef) extends AnyVal {
|
|||
}
|
||||
|
||||
class FutureOps[T](val future: Future[T]) extends AnyVal {
|
||||
def -!->[C <: ChannelList](channel: ChannelRef[C]): Future[T] = macro macros.Tell.futureImpl[C, T]
|
||||
def -!->[C <: ChannelList](channel: ChannelRef[C]): Future[T] = macro macros.Tell.futureOpsImpl[C, T]
|
||||
def -?->[C <: ChannelList](channel: ChannelRef[C]): Future[_] = macro macros.Ask.futureImpl[ChannelList, Any, C, T]
|
||||
def lub[LUB](implicit ev: T <:< WrappedMessage[_, LUB]): Future[LUB] = {
|
||||
implicit val ec = ExecutionContexts.sameThreadExecutionContext
|
||||
|
|
@ -43,7 +43,7 @@ class FutureOps[T](val future: Future[T]) extends AnyVal {
|
|||
}
|
||||
|
||||
class AnyOps[T](val value: T) extends AnyVal {
|
||||
def -!->[C <: ChannelList](channel: ChannelRef[C]): Unit = macro macros.Tell.opsImpl[C, T]
|
||||
def -!->[C <: ChannelList](channel: ChannelRef[C]): T = macro macros.Tell.opsImpl[C, T]
|
||||
def -?->[C <: ChannelList](channel: ChannelRef[C]): Future[_] = macro macros.Ask.opsImpl[ChannelList, Any, C, T]
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -26,13 +26,27 @@ object Ask {
|
|||
|
||||
val tpeChannel = weakTypeOf[Channel]
|
||||
val tpeMsg = weakTypeOf[Msg]
|
||||
val unwrapped = unwrapMsgType(c.universe)(tpeMsg)
|
||||
val isFuture = tpeMsg <:< typeOf[Future[_]]
|
||||
val unwrapped =
|
||||
if (isFuture)
|
||||
tpeMsg match {
|
||||
case TypeRef(_, _, x :: _) ⇒ unwrapMsgType(c.universe)(x)
|
||||
}
|
||||
else unwrapMsgType(c.universe)(tpeMsg)
|
||||
val out = replyChannels(c.universe)(tpeChannel, unwrapped)
|
||||
|
||||
Tell.verify(c)(null, unwrapped, typeOf[(Any, Nothing) :+: TNil], tpeChannel)
|
||||
|
||||
implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing]))
|
||||
implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out))
|
||||
if (isFuture)
|
||||
if (unwrapped <:< typeOf[ChannelList])
|
||||
reify(askFutureWrapped[WrappedMessage[ReturnChannels, ReturnLUB]](
|
||||
c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice))
|
||||
else
|
||||
reify(askFuture[WrappedMessage[ReturnChannels, ReturnLUB]](
|
||||
c.prefix.splice.actorRef, msg.splice.asInstanceOf[Future[Any]])(imp[Timeout](c).splice))
|
||||
else
|
||||
reify(askOps[WrappedMessage[ReturnChannels, ReturnLUB]](
|
||||
c.prefix.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice))
|
||||
}
|
||||
|
|
@ -61,6 +75,7 @@ object Ask {
|
|||
channel.splice.actorRef, toMsg(c)(msg, tpeMsg).splice)(imp[Timeout](c).splice))
|
||||
}
|
||||
|
||||
// this is the implementation for Future[_] -?-> ChannelRef[_]
|
||||
def futureImpl[ //
|
||||
ReturnChannels <: ChannelList, // the precise type union describing the reply
|
||||
ReturnLUB, // the least-upper bound for the reply types
|
||||
|
|
@ -80,7 +95,7 @@ object Ask {
|
|||
|
||||
implicit val ttReturnChannels = c.TypeTag[ReturnChannels](toChannels(c.universe)(out, weakTypeOf[Nothing]))
|
||||
implicit val ttReturnLUB = c.TypeTag[ReturnLUB](c.universe.lub(out))
|
||||
if (tpeMsg <:< typeOf[ChannelList])
|
||||
if (tpeMsg <:< typeOf[WrappedMessage[_, _]])
|
||||
reify(askFutureWrapped[WrappedMessage[ReturnChannels, ReturnLUB]](
|
||||
channel.splice.actorRef, c.prefix.splice.future.asInstanceOf[Future[WrappedMessage[TNil, Any]]])(imp[Timeout](c).splice))
|
||||
else
|
||||
|
|
|
|||
|
|
@ -16,42 +16,50 @@ object Tell {
|
|||
|
||||
def impl[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context {
|
||||
type PrefixType = ChannelRef[MyChannels]
|
||||
})(msg: c.Expr[Msg]): c.Expr[Unit] = {
|
||||
val tpeMyChannels = c.universe.weakTypeOf[MyChannels]
|
||||
val tpeMsg = c.universe.weakTypeOf[Msg]
|
||||
val (tpeSender, senderTree, sender) = getSenderChannel(c)
|
||||
|
||||
verify(c)(senderTree, unwrapMsgType(c.universe)(tpeMsg), tpeSender, tpeMyChannels)
|
||||
|
||||
c.universe.reify(c.prefix.splice.actorRef.tell(toMsg(c)(msg, tpeMsg).splice, sender.splice))
|
||||
}
|
||||
})(msg: c.Expr[Msg]): c.Expr[Msg] =
|
||||
doTell(c)(c.universe.weakTypeOf[MyChannels], c.universe.weakTypeOf[Msg], msg, c.prefix)
|
||||
|
||||
def opsImpl[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context {
|
||||
type PrefixType = AnyOps[Msg]
|
||||
})(channel: c.Expr[ChannelRef[MyChannels]]): c.Expr[Unit] = {
|
||||
val tpeMyChannels = c.universe.weakTypeOf[MyChannels]
|
||||
val tpeMsg = c.universe.weakTypeOf[Msg]
|
||||
})(channel: c.Expr[ChannelRef[MyChannels]]): c.Expr[Msg] = {
|
||||
import c.universe._
|
||||
doTell(c)(weakTypeOf[MyChannels], weakTypeOf[Msg], reify(c.prefix.splice.value), channel)
|
||||
}
|
||||
|
||||
def doTell[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context)(
|
||||
tpeMyChannels: c.Type, tpeMsg: c.Type, msg: c.Expr[Msg], target: c.Expr[ChannelRef[MyChannels]]): c.Expr[Msg] = {
|
||||
val (tpeSender, senderTree, sender) = getSenderChannel(c)
|
||||
|
||||
verify(c)(senderTree, unwrapMsgType(c.universe)(tpeMsg), tpeSender, tpeMyChannels)
|
||||
val cond = bool(c, tpeMsg <:< c.typeOf[WrappedMessage[_, _]])
|
||||
c.universe.reify {
|
||||
val $m = msg.splice
|
||||
target.splice.actorRef.tell(if (cond.splice) $m.asInstanceOf[WrappedMessage[TNil, Any]].value else $m, sender.splice)
|
||||
$m
|
||||
}
|
||||
}
|
||||
|
||||
val msg = c.universe.reify(c.prefix.splice.value)
|
||||
c.universe.reify(channel.splice.actorRef.tell(toMsg(c)(msg, tpeMsg).splice, sender.splice))
|
||||
def futureOpsImpl[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context {
|
||||
type PrefixType = FutureOps[Msg]
|
||||
})(channel: c.Expr[ChannelRef[MyChannels]]): c.Expr[Future[Msg]] = {
|
||||
import c.universe._
|
||||
doFutureTell(c)(weakTypeOf[MyChannels], weakTypeOf[Msg], reify(c.prefix.splice.future), channel)
|
||||
}
|
||||
|
||||
def futureImpl[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context {
|
||||
type PrefixType = FutureOps[Msg]
|
||||
})(channel: c.Expr[ChannelRef[MyChannels]]): c.Expr[Future[Msg]] = {
|
||||
val tpeMyChannels = c.universe.weakTypeOf[MyChannels]
|
||||
val tpeMsg = c.universe.weakTypeOf[Msg]
|
||||
val (tpeSender, senderTree, sender) = getSenderChannel(c)
|
||||
|
||||
verify(c)(senderTree, unwrapMsgType(c.universe)(tpeMsg), tpeSender, tpeMyChannels)
|
||||
|
||||
c.universe.reify(pipeTo[Msg](c.prefix.splice, channel.splice, sender.splice))
|
||||
type PrefixType = ChannelRef[MyChannels]
|
||||
})(future: c.Expr[Future[Msg]]): c.Expr[Future[Msg]] = {
|
||||
import c.universe._
|
||||
doFutureTell(c)(weakTypeOf[MyChannels], weakTypeOf[Msg], future, c.prefix)
|
||||
}
|
||||
|
||||
@inline def pipeTo[Msg](f: FutureOps[Msg], c: ChannelRef[_], snd: ActorRef): Future[Msg] =
|
||||
def doFutureTell[MyChannels <: ChannelList: c.WeakTypeTag, Msg: c.WeakTypeTag](c: Context)(
|
||||
tpeMyChannels: c.Type, tpeMsg: c.Type, future: c.Expr[Future[Msg]], target: c.Expr[ChannelRef[MyChannels]]): c.Expr[Future[Msg]] = {
|
||||
val (tpeSender, senderTree, sender) = getSenderChannel(c)
|
||||
verify(c)(senderTree, unwrapMsgType(c.universe)(tpeMsg), tpeSender, tpeMyChannels)
|
||||
c.universe.reify(pipeTo[Msg](future.splice, target.splice, sender.splice))
|
||||
}
|
||||
|
||||
@inline def pipeTo[Msg](f: Future[Msg], c: ChannelRef[_], snd: ActorRef): Future[Msg] =
|
||||
f.future.andThen {
|
||||
case Success(s: WrappedMessage[_, _]) ⇒ c.actorRef.tell(s.value, snd)
|
||||
case Success(s) ⇒ c.actorRef.tell(s, snd)
|
||||
|
|
@ -76,12 +84,12 @@ object Tell {
|
|||
def rec(msg: Set[c.universe.Type], checked: Set[c.universe.Type], depth: Int): Unit =
|
||||
if (msg.nonEmpty) {
|
||||
val u: c.universe.type = c.universe
|
||||
val replies = msg map (m ⇒ m -> (replyChannels(u)(chT, m) map (t => ignoreUnknown(t))))
|
||||
val replies = msg map (m ⇒ m -> (replyChannels(u)(chT, m) map (t ⇒ ignoreUnknown(t))))
|
||||
val missing = replies collect { case (k, v) if v.size == 0 ⇒ k }
|
||||
if (missing.nonEmpty)
|
||||
error(c, s"target ChannelRef does not support messages of types ${missing mkString ", "} (at depth $depth)")
|
||||
else {
|
||||
val nextSend = replies.map(_._2).flatten map (m ⇒ m -> (replyChannels(u)(sndT, m) map (t => ignoreUnknown(t))))
|
||||
val nextSend = replies.map(_._2).flatten map (m ⇒ m -> (replyChannels(u)(sndT, m) map (t ⇒ ignoreUnknown(t))))
|
||||
val nextMissing = nextSend collect { case (k, v) if v.size == 0 ⇒ k }
|
||||
if (nextMissing.nonEmpty)
|
||||
error(c, s"implicit sender `$sender` does not support messages of the reply types ${nextMissing mkString ", "} (at depth $depth)")
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ class ChannelDocSpec extends AkkaSpec {
|
|||
channelA <-!- a // send a to channelA
|
||||
a -!-> channelA // same thing as above
|
||||
|
||||
//channelA <-!- fA // eventually send the future’s value to channelA
|
||||
channelA <-!- fA // eventually send the future’s value to channelA
|
||||
fA -!-> channelA // same thing as above
|
||||
|
||||
// ask the actor; return type given in full for illustration
|
||||
|
|
@ -100,7 +100,7 @@ class ChannelDocSpec extends AkkaSpec {
|
|||
|
||||
a -?-> channelA // same thing as above
|
||||
|
||||
//channelA <-?- fA // eventually ask the actor, return the future
|
||||
channelA <-?- fA // eventually ask the actor, return the future
|
||||
fA -?-> channelA // same thing as above
|
||||
|
||||
// chaining works as well
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue